diff --git a/.github/workflows/automated-tests.yml b/.github/workflows/automated-tests.yml index f45451fd..6d5f4c16 100644 --- a/.github/workflows/automated-tests.yml +++ b/.github/workflows/automated-tests.yml @@ -29,7 +29,6 @@ env: VALENTINA_LOG_FILE: "/tmp/valentina.log" VALENTINA_TEST_MONGO_URI: "mongodb://localhost:27017" VALENTINA_TEST_MONGO_DATABASE_NAME: "valentina-test" - VALENTINA_DB_PATH: "valentina.sqlite" # TODO: Depreciated concurrency: group: ${{ github.workflow }}-${{ github.ref }} diff --git a/.github/workflows/create-release.yml b/.github/workflows/create-release.yml index ed8274cc..1f71ee71 100644 --- a/.github/workflows/create-release.yml +++ b/.github/workflows/create-release.yml @@ -6,9 +6,6 @@ on: tags: - "v*" # Push events to matching v*, i.e. v1.0, v20.15.10 -env: - VALENTINA_LOG_FILE: "/tmp/valentina.log" # pass tests - concurrency: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true diff --git a/.github/workflows/publish-to-ghcr.yml b/.github/workflows/publish-to-ghcr.yml index f7b57f5f..71e07351 100644 --- a/.github/workflows/publish-to-ghcr.yml +++ b/.github/workflows/publish-to-ghcr.yml @@ -14,7 +14,6 @@ concurrency: env: REGISTRY: ghcr.io IMAGE_NAME: ${{ github.repository }} - VALENTINA_LOG_FILE: "/tmp/valentina.log" # Pass tests jobs: build-and-push-image: diff --git a/Dockerfile b/Dockerfile index 921848d0..5a8f0004 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,7 +17,7 @@ WORKDIR /app COPY . /app # Install git -RUN apt-get update && apt-get install -y git sqlite3 tzdata +RUN apt-get update && apt-get install -y git tzdata # Install Poetry RUN pip install poetry diff --git a/poetry.lock b/poetry.lock index 8512cd10..cf5daba9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1200,16 +1200,6 @@ pygments = ">=2.12.0" [package.extras] dev = ["black", "hypothesis", "mypy", "pygments (>=2.14.0)", "pytest", "pytest-cov", "pytest-timeout", "ruff", "tox", "types-pygments"] -[[package]] -name = "peewee" -version = "3.17.0" -description = "a little orm" -optional = false -python-versions = "*" -files = [ - {file = "peewee-3.17.0.tar.gz", hash = "sha256:3a56967f28a43ca7a4287f4803752aeeb1a57a08dee2e839b99868181dfb5df8"}, -] - [[package]] name = "platformdirs" version = "3.11.0" @@ -2250,4 +2240,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.11,<3.13" -content-hash = "104eab9eb6dd7b7fd7b22decd04656372e57a0e7d1dc17b5da201cc2edaaa1b2" +content-hash = "96cf6712f32eb0bcfc4fb4182035175482454f8d3ebac9c454951ec04c270328" diff --git a/pyproject.toml b/pyproject.toml index b82fa2c0..ae587c13 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,6 @@ inflect = "^7.0.0" loguru = "^0.7.2" numpy = "^1.26.1" - peewee = "^3.17.0" py-cord = "^2.4.1" pydantic = "^2.4.2" python = ">=3.11,<3.13" diff --git a/src/valentina/main.py b/src/valentina/main.py index 6eabcd31..58318f60 100644 --- a/src/valentina/main.py +++ b/src/valentina/main.py @@ -44,7 +44,6 @@ def version_callback(value: bool) -> None: logging.getLogger("discord.webhook").setLevel(level=CONFIG["VALENTINA_LOG_LEVEL_HTTP"].upper()) logging.getLogger("discord.client").setLevel(level=CONFIG["VALENTINA_LOG_LEVEL_HTTP"].upper()) logging.getLogger("faker").setLevel(level="INFO") -logging.getLogger("peewee").setLevel(level="INFO") for service in ["urllib3", "boto3", "botocore", "s3transfer"]: logging.getLogger(service).setLevel(level=CONFIG["VALENTINA_LOG_LEVEL_AWS"].upper()) diff --git a/src/valentina/models/errors.py b/src/valentina/models/errors.py index 3050d130..141f4ffb 100644 --- a/src/valentina/models/errors.py +++ b/src/valentina/models/errors.py @@ -4,7 +4,6 @@ import discord from discord.ext import commands from loguru import logger -from peewee import DoesNotExist from valentina.constants import EmbedColor from valentina.utils import errors @@ -20,7 +19,7 @@ def __init__(self) -> None: self.channel: discord.TextChannel = None @staticmethod - def _handle_known_exceptions( # noqa: C901 + def _handle_known_exceptions( ctx: discord.ApplicationContext, error: Exception ) -> tuple[str | None, str | None, bool]: """Handle known exceptions and return user message, log message, and traceback flag. @@ -74,11 +73,6 @@ def _handle_known_exceptions( # noqa: C901 log_msg = f"ERROR: `{ctx.user.display_name}` tried to run `/{ctx.command}` and a character class was not found" show_traceback = True - if isinstance(error, DoesNotExist): - user_msg = "Sorry I couldn't find that. Potential bug has been reported." - log_msg = f"ERROR: `{ctx.user.display_name}` tried to run `/{ctx.command}` with an invalid database ID" - show_traceback = True - if isinstance(error, errors.MessageTooLongError): user_msg = "Message too long to send. This is a bug has been reported." log_msg = "ERROR: Message too long to send. Check the logs for the message." diff --git a/src/valentina/models/sqlite_models.py b/src/valentina/models/sqlite_models.py deleted file mode 100644 index 697e4c7f..00000000 --- a/src/valentina/models/sqlite_models.py +++ /dev/null @@ -1,698 +0,0 @@ -"""Models for the database.""" -from __future__ import annotations - -from datetime import datetime, timezone - -from peewee import ( - BooleanField, - DateTimeField, - DeferredForeignKey, - DoesNotExist, - ForeignKeyField, - IntegerField, - Model, - TextField, -) -from playhouse.sqlite_ext import CSqliteExtDatabase, JSONField - -from valentina.constants import CONFIG -from valentina.utils import errors - - -# This is duplicated from valentina.utils.helpers to avoid circular imports -def time_now() -> datetime: - """Return the current time in UTC.""" - return datetime.now(timezone.utc).replace(microsecond=0) - - -# Instantiate Database -DATABASE = CSqliteExtDatabase( - CONFIG["VALENTINA_DB_PATH"], - pragmas={ - "journal_mode": "wal", - "cache_size": -1 * 64000, # 64MB - "foreign_keys": 1, - "ignore_check_constraints": 0, - "synchronous": 1, - }, -) - - -class BaseModel(Model): - """Base model for the database.""" - - class Meta: - """Meta class for the database, inherited by all subclasses.""" - - database = DATABASE - - def __str__(self) -> str: - """Return the string representation of the model.""" - return str(self.__dict__) - - -class DatabaseVersion(BaseModel): - """Database version model for the database.""" - - version = TextField() - date = DateTimeField(default=time_now) - - -class Guild(BaseModel): - """Guild model for the database.""" - - id = IntegerField(primary_key=True) # noqa: A003 - name = TextField() - created = DateTimeField(default=time_now) - data = JSONField(null=True) - - def __str__(self) -> str: - """Return the string representation of the model.""" - return f"[{self.id}] {self.name}" - - class Meta: - """Meta class for the model.""" - - table_name = "guilds" - - -class GuildUser(BaseModel): - """Table for storing information specific to users on a guild.""" - - guild = ForeignKeyField(Guild, backref="users") - user = IntegerField() - data = JSONField(null=True) - - def __str__(self) -> str: - """Return the string representation of the model.""" - return f"[{self.data['id']}] {self.data['display_name']}" - - def fetch_experience(self, campaign_id: int) -> tuple[int, int, int, int, int]: - """Fetch the experience for a user for a specific campaign. - - Args: - campaign_id (int): The campaign ID to fetch experience for. - - Returns: - tuple[int, int, int, int, int]: The campaign experience, campaign total experience, lifetime experience, campaign total cool points, and lifetime cool points. - """ - campaign_xp = self.data.get(f"{campaign_id}_experience", 0) - campaign_total_xp = self.data.get(f"{campaign_id}_total_experience", 0) - campaign_total_cp = self.data.get(f"{campaign_id}_total_cool_points", 0) - lifetime_xp = self.data.get("lifetime_experience", 0) - lifetime_cp = self.data.get("lifetime_cool_points", 0) - - return campaign_xp, campaign_total_xp, lifetime_xp, campaign_total_cp, lifetime_cp - - def add_experience(self, campaign_id: int, amount: int) -> tuple[int, int, int]: - """Set the experience for a user for a specific campaign. - - Args: - campaign_id (int): The campaign ID to set experience for. - amount (int): The amount of experience to set. - - Returns: - tuple[int, int, int]: The campaign experience, campaign total experience, and lifetime experience. - """ - ( - campaign_xp, - campaign_total_xp, - lifetime_xp, - _, - _, - ) = self.fetch_experience(campaign_id) - - new_xp = self.data[f"{campaign_id}_experience"] = campaign_xp + amount - new_total = self.data[f"{campaign_id}_total_experience"] = campaign_total_xp + amount - new_lifetime_total = self.data["lifetime_experience"] = lifetime_xp + amount - - self.save() - - return new_xp, new_total, new_lifetime_total - - def add_cool_points(self, campaign_id: int, amount: int) -> tuple[int, int]: - """Set the cool points for a user for a specific campaign. - - Args: - campaign_id (int): The campaign ID to set cool points for. - amount (int): The amount of cool points to set. - - Returns: - tuple[int, int]: The campaign total cool points and lifetime cool points. - """ - ( - _, - _, - _, - campaign_total_cp, - lifetime_cp, - ) = self.fetch_experience(campaign_id) - - new_total = self.data[f"{campaign_id}_total_cool_points"] = campaign_total_cp + amount - new_lifetime = self.data["lifetime_cool_points"] = lifetime_cp + amount - - self.save() - - return new_total, new_lifetime - - def spend_experience(self, campaign_id: int, amount: int) -> int: - """Spend experience for a user for a specific campaign. - - Args: - campaign_id (int): The campaign ID to spend experience for. - amount (int): The amount of experience to spend. - - Returns: - int: The new campaign experience. - """ - ( - campaign_xp, - _, - _, - _, - _, - ) = self.fetch_experience(campaign_id) - - new_xp = self.data[f"{campaign_id}_experience"] = campaign_xp - amount - - if new_xp < 0: - msg = f"Can not spend {amount} xp with only {campaign_xp} available" - raise errors.NotEnoughExperienceError(msg) - - self.save() - - return new_xp - - -class CharacterClass(BaseModel): - """Character Class model for the database.""" - - name = TextField(unique=True) - - class Meta: - """Meta class for the model.""" - - table_name = "character_classes" - - -class VampireClan(BaseModel): - """Vampire clans.""" - - name = TextField(unique=True) - - class Meta: - """Meta class for the model.""" - - table_name = "vampire_clans" - - -###### Traits ###### - - -class TraitCategory(BaseModel): - """Trait Category model for the database.""" - - name = TextField(unique=True) - - class Meta: - """Meta class for the model.""" - - table_name = "trait_categories" - - -class CustomTrait(BaseModel): - """Custom Trait model for the database.""" - - character = DeferredForeignKey("Character", backref="custom_traits") - created = DateTimeField(default=time_now) - modified = DateTimeField(default=time_now) - description = TextField(null=True) - name = TextField() - category = ForeignKeyField(TraitCategory, backref="custom_traits") - value = IntegerField(default=0) - max_value = IntegerField(default=0) - - class Meta: - """Meta class for the model.""" - - table_name = "custom_traits" - - -class Trait(BaseModel): - """Character Trait model for the database.""" - - name = TextField(unique=True) - category = ForeignKeyField(TraitCategory, backref="traits") - - class Meta: - """Meta class for the model.""" - - table_name = "traits" - - -###### Characters ###### - - -class CustomSection(BaseModel): - """Custom sections added to a character sheet.""" - - character = DeferredForeignKey("Character", backref="custom_sections") - created = DateTimeField(default=time_now) - modified = DateTimeField(default=time_now) - description = TextField(null=True) - title = TextField() - - class Meta: - """Meta class for the model.""" - - table_name = "custom_sections" - - -class Character(BaseModel): - """Character model for the database.""" - - # GENERAL #################################### - created = DateTimeField(default=time_now) - data = JSONField(null=True) - - # Foreign Keys ############################### - char_class = ForeignKeyField(CharacterClass, backref="characters") - guild = ForeignKeyField(Guild, backref="characters") - created_by = ForeignKeyField(GuildUser, backref="created_characters", null=True) - owned_by = ForeignKeyField(GuildUser, backref="owned_characters", null=True) - clan = ForeignKeyField(VampireClan, backref="characters", null=True) - - @property - def name(self) -> str: - """Return the name of the character including their nickname.""" - first_name = self.data.get("first_name", "") - last_name = self.data.get("last_name", "") - nickname = self.data.get("nickname", "") - - display_name = f"{first_name.title()}" - display_name += f" ({nickname.title()})" if nickname else "" - display_name += f" {last_name.title()}" if last_name else "" - - return display_name - - @property - def full_name(self) -> str: - """Return the first and last name of the character.""" - first_name = self.data.get("first_name", "") - last_name = self.data.get("last_name", "") - - display_name = f"{first_name.title()}" - display_name += f" {last_name.title()}" if last_name else "" - return display_name - - @property - def freebie_points(self) -> int: - """Return the number of freebie points the character has available.""" - return self.data.get("freebie_points", 0) - - @property - def class_name(self) -> str: - """Return the character's class from the char_class table.""" - return self.char_class.name - - @property - def clan_name(self) -> str: - """Return the character's clan from the vampire_clans table.""" - return self.clan.name - - @property - def traits_list(self) -> list[Trait | CustomTrait]: - """Fetch all traits for this character. - - Returns: - list[Trait | CustomTrait]: List of all traits and custom traits. - """ - all_traits = [] - - all_traits.extend( - [tv.trait for tv in TraitValue.select().where(TraitValue.character == self)] - ) - - all_traits.extend(list(self.custom_traits)) - - return sorted(set(all_traits), key=lambda x: x.name) - - @property - def traits_dict(self) -> dict[str, list[Trait | CustomTrait]]: - """Fetch all traits for this character. - - Returns: - dict[str, list[Trait | CustomTrait]]: Dictionary of traits and custom traits with trait category as the key. - """ - all_traits: dict[str, list[Trait | CustomTrait]] = {} - - for tv in TraitValue.select().where(TraitValue.character == self): - category = str(tv.trait.category.name) - all_traits.setdefault(category, []) - all_traits[category].append(tv.trait) - - for ct in self.custom_traits: - category = str(ct.category.name) - all_traits.setdefault(category, []) - all_traits[category].append(ct) - - return all_traits - - @property - def all_trait_values(self) -> dict[str, list[tuple[str, int, int, str]]]: - """Fetch all trait values for a character inclusive of common and custom for display on a character sheet. - - Returns: - dict[str, list[tuple[str, int, int, str]]]: Dictionary key is category. Values are a tuple of (trait name, trait value, trait max value, trait dots) - - Example: - { - "PHYSICAL": [("Strength", 3, 5, "●●●○○"), ("Agility", 2, 5, "●●●○○")], - "SOCIAL": [("Persuasion", 1, 5, "●○○○○")] - } - """ - from valentina.utils.helpers import get_max_trait_value, num_to_circles # noqa: PLC0415 - - all_traits: dict[str, list[tuple[str, int, int, str]]] = {} - - for category, traits in self.traits_dict.items(): - all_traits.setdefault(category, []) - - for trait in traits: - value = self.get_trait_value(trait) - - max_value = get_max_trait_value(trait=trait.name, category=category) - dots = num_to_circles(value, max_value) - all_traits[category].append((trait.name, value, max_value, dots)) - - return all_traits - - def get_trait_value(self, trait: Trait | CustomTrait) -> int: - """Return the character's value of a trait. If the trait is not found, return 0. - - Returns: - int: The character's value of the trait. - """ - try: - if isinstance(trait, Trait): - return TraitValue.get(TraitValue.character == self, TraitValue.trait == trait).value - except DoesNotExist: - return 0 - - return trait.value # custom traits - - @property - def is_active(self) -> bool: - """Return True if the character is set as active.""" - return self.data.get("is_active", False) - - @property - def is_alive(self) -> bool: - """Return True if the character is alive.""" - return self.data.get("is_alive", True) - - def kill(self) -> None: - """Set the character as dead.""" - self.data["is_alive"] = False - self.data["is_active"] = False - self.save() - - def set_trait_value(self, trait: Trait | CustomTrait, value: int) -> None: - """Set the character's value of a trait.""" - if isinstance(trait, CustomTrait): - trait.value = value - trait.modified = time_now() - trait.save() - - elif isinstance(trait, Trait): - trait_value, created = TraitValue.get_or_create( - character=self, trait=trait, defaults={"value": value, "modified": time_now()} - ) - - if not created: - trait_value.value = value - trait_value.modified = time_now() - trait_value.save() - - self.data["modified"] = str(time_now()) - self.save() - - def __str__(self) -> str: - """Return the string representation of the model.""" - return f"[{self.id}] {self.name}" - - class Meta: - """Meta class for the model.""" - - table_name = "characters" - - -###### Macros ###### - - -class Macro(BaseModel): - """Macros for quick dice rolls.""" - - name = TextField() - abbreviation = TextField() - description = TextField(null=True) - created = DateTimeField(default=time_now) - modified = DateTimeField(default=time_now) - guild = ForeignKeyField(Guild, backref="macros") - user = ForeignKeyField(GuildUser, backref="macros") - - def remove(self) -> None: - """Delete the macro and associated macro traits.""" - for mt in self.traits: - mt.delete_instance() - - super().delete_instance() - - class Meta: - """Meta class for the model.""" - - table_name = "macros" - - -class MacroTrait(BaseModel): - """Join table for Macro and Trait.""" - - macro = ForeignKeyField(Macro, backref="traits") - trait = ForeignKeyField(Trait, backref="macros", null=True) - custom_trait = ForeignKeyField(CustomTrait, backref="macros", null=True) - - @classmethod - def create_from_trait_name(cls, macro: Macro, trait_name: str) -> MacroTrait: - """Create a MacroTrait for the given macro and trait_name.""" - try: - trait = Trait.get(Trait.name == trait_name) - return cls.create(macro=macro, trait=trait) - except DoesNotExist: - custom_trait = CustomTrait.get(CustomTrait.name == trait_name) - return cls.create(macro=macro, custom_trait=custom_trait) - - @classmethod - def create_from_trait(cls, macro: Macro, trait: Trait | CustomTrait) -> MacroTrait: - """Create a MacroTrait for the given macro and trait.""" - if isinstance(trait, Trait): - return cls.create(macro=macro, trait=trait) - - return cls.create(macro=macro, custom_trait=trait) - - class Meta: - """Meta class for the model.""" - - table_name = "macro_traits" - indexes = ( - (("macro", "trait"), False), - (("macro", "custom_trait"), False), - ) - - -###### Campaign Models ###### - - -class Campaign(BaseModel): - """Campaign model for the database.""" - - guild = ForeignKeyField(Guild, backref="campaigns") - created = DateTimeField(default=time_now) - modified = DateTimeField(default=time_now) - name = TextField(unique=True) - description = TextField(null=True) - current_date = DateTimeField(null=True, formats=["%Y-%m-%d"]) - is_active = BooleanField(default=False) - data = JSONField(null=True) - - class Meta: - """Meta class for the model.""" - - table_name = "campaigns" - - -class CampaignNPC(BaseModel): - """NPC model for the database.""" - - campaign = ForeignKeyField(Campaign, backref="npcs") - created = DateTimeField(default=time_now) - modified = DateTimeField(default=time_now) - - name = TextField() - description = TextField(null=True) - npc_class = TextField(null=True) - data = JSONField(null=True) - - class Meta: - """Meta class for the model.""" - - table_name = "campaign_npcs" - - def campaign_display(self) -> str: - """Return the display for campaign overview.""" - display = f"**{self.name}**" - display += f" ({self.npc_class})" if self.npc_class else "" - display += f"\n{self.description}" if self.description else "" - - return display - - -class CampaignChapter(BaseModel): - """Campaign Chapter model for the database.""" - - campaign = ForeignKeyField(Campaign, backref="chapters") - created = DateTimeField(default=time_now) - modified = DateTimeField(default=time_now) - chapter_number = IntegerField() - name = TextField(null=True) - short_description = TextField(null=True) - description = TextField(null=True) - data = JSONField(null=True) - - class Meta: - """Meta class for the model.""" - - table_name = "campaign_chapters" - - def campaign_display(self) -> str: - """Return the display for campaign overview.""" - display = f"**{self.chapter_number}: __{self.name}__**" - display += f"\n{self.description}" if self.description else "" - - return display - - -class CampaignNote(BaseModel): - """Notes for a campaign.""" - - campaign = ForeignKeyField(Campaign, backref="notes") - chapter = ForeignKeyField(CampaignChapter, backref="notes", null=True) - user = ForeignKeyField(GuildUser, backref="campaign_notes") - created = DateTimeField(default=time_now) - modified = DateTimeField(default=time_now) - name = TextField() - description = TextField(null=True) - data = JSONField(null=True) - - class Meta: - """Meta class for the model.""" - - table_name = "campaign_notes" - - def campaign_display(self) -> str: - """Return the display for campaign overview.""" - display = f"**{self.name}**\n" - display += f"{self.description}" if self.description else "" - - return display - - -###### Dice Rolls ###### - - -class RollThumbnail(BaseModel): - """Thumbnail for a roll.""" - - url = TextField() - roll_type = TextField() - created = DateTimeField(default=time_now) - guild = ForeignKeyField(Guild, backref="roll_thumbnails") - user = ForeignKeyField(GuildUser, backref="roll_thumbnails") - data = JSONField(null=True) - - class Meta: - """Meta class for the model.""" - - table_name = "roll_thumbnails" - - -class RollStatistic(BaseModel): - """Track roll results for statistics.""" - - user = ForeignKeyField(GuildUser, backref="roll_statistics") - guild = ForeignKeyField(Guild, backref="roll_statistics") - character = ForeignKeyField(Character, backref="roll_statistics", null=True) - result = TextField() - pool = IntegerField() - difficulty = IntegerField() - date_rolled = DateTimeField(default=time_now) - data = JSONField(null=True) - - -class RollProbability(BaseModel): - """Track proability of roll results.""" - - pool = IntegerField() - difficulty = IntegerField() - dice_size = IntegerField() - created = DateTimeField(default=time_now) - data = JSONField(null=True) - - -###### Lookup Tables ###### - - -class TraitValue(BaseModel): - """Join table for Character and Trait.""" - - character = ForeignKeyField(Character, backref="trait_values") - trait = ForeignKeyField(Trait, backref="trait_values", null=True) - value = IntegerField(default=0) - modified = DateTimeField(default=time_now) - - class Meta: - """Meta class for the model.""" - - table_name = "trait_values" - indexes = ((("character", "trait", "value"), False),) - - -class TraitClass(BaseModel): - """Join table for Trait and CharacterClass.""" - - trait = ForeignKeyField(Trait, backref="classes") - character_class = ForeignKeyField(CharacterClass, backref="traits") - - class Meta: - """Meta class for the model.""" - - table_name = "trait_classes" - indexes = ( - (("trait", "character_class"), False), - (("character_class", "trait"), False), - ) - - -class TraitCategoryClass(BaseModel): - """Join table for TraitCategory and CharacterClass.""" - - category = ForeignKeyField(TraitCategory, backref="classes") - character_class = ForeignKeyField(CharacterClass, backref="trait_categories") - - class Meta: - """Meta class for the model.""" - - table_name = "trait_category_classes" - indexes = ( - (("category", "character_class"), False), - (("character_class", "category"), False), - ) diff --git a/src/valentina/utils/migrate_to_mongo.py b/src/valentina/utils/migrate_to_mongo.py deleted file mode 100644 index 82676cca..00000000 --- a/src/valentina/utils/migrate_to_mongo.py +++ /dev/null @@ -1,376 +0,0 @@ -"""One time migration from sqlite to mongodb.""" -from datetime import datetime - -from loguru import logger - -from valentina.constants import ( - PermissionManageCampaign, - PermissionsGrantXP, - PermissionsKillCharacter, - PermissionsManageTraits, - RollResultType, - TraitCategory, -) -from valentina.models import ( - Campaign, - CampaignChapter, - CampaignExperience, - CampaignNote, - CampaignNPC, - Character, - CharacterSheetSection, - CharacterTrait, - GlobalProperty, - Guild, - GuildChannels, - GuildPermissions, - GuildRollResultThumbnail, - RollStatistic, - User, -) -from valentina.models.sqlite_models import Campaign as SqliteCampaign -from valentina.models.sqlite_models import CampaignChapter as SqliteCampaignChapter -from valentina.models.sqlite_models import CampaignNote as SqliteCampaignNote -from valentina.models.sqlite_models import CampaignNPC as SqliteCampaignNPC -from valentina.models.sqlite_models import Character as SqliteCharacter -from valentina.models.sqlite_models import CustomSection as SqliteCustomSection -from valentina.models.sqlite_models import DatabaseVersion as SQLVersion -from valentina.models.sqlite_models import Guild as SqliteGuild -from valentina.models.sqlite_models import GuildUser as SqliteUser -from valentina.models.sqlite_models import RollStatistic as SqliteRollStatistic -from valentina.models.sqlite_models import RollThumbnail as SqliteRollThumbnail -from valentina.utils.helpers import get_max_trait_value - - -class Migrate: - """One time migration from sqlite to mongodb.""" - - def __init__(self, config: dict) -> None: - """Initialize.""" - self.config = config - self.campaign_map: list[tuple[int, str]] = [] - - @staticmethod - async def _migrate_version() -> None: - """Migrate version.""" - property_document = GlobalProperty() - await property_document.insert() - - for v in SQLVersion.select().order_by(SQLVersion.id.asc()): - property_document.versions.append(v.version) - - await property_document.save() - - @staticmethod - async def _migrate_guilds() -> None: - """Migrate guilds.""" - # Get all guilds - guilds = SqliteGuild.select() - for sqlguild in guilds: - # Create permissions object - permissions = GuildPermissions( - manage_traits=PermissionsManageTraits( - sqlguild.data.get("permissions_edit_trait", None) - ), - grant_xp=PermissionsGrantXP(sqlguild.data.get("permissions_edit_xp", None)), - manage_campaigns=PermissionManageCampaign( - sqlguild.data.get("permissions_manage_campaigns", None) - ), - kill_character=PermissionsKillCharacter( - sqlguild.data.get("permissions_kill_character", None) - ), - ) - - channels = GuildChannels( - audit_log=sqlguild.data.get("audit_log_channel_id", None), - changelog=sqlguild.data.get("changelog_channel_id", None), - error_log=sqlguild.data.get("error_log_channel_id", None), - storyteller=sqlguild.data.get("storyteller_channel_id", None), - ) - - # Get roll thumbnails - roll_thumbnails = [ - GuildRollResultThumbnail( - url=thumbnail.url, - roll_type=RollResultType[thumbnail.roll_type], - user=SqliteUser.get(SqliteUser.id == thumbnail.user).user, - date_created=datetime.strptime(thumbnail.created, "%Y-%m-%d %H:%M:%S%z"), - ) - for thumbnail in SqliteRollThumbnail.select().where( - SqliteRollThumbnail.guild == sqlguild - ) - ] - - # Create mongo guild object - mongo_guild = Guild( - id=sqlguild.id, - name=sqlguild.name, - changelog_posted_version=sqlguild.data.get("changelog_posted_version", None), - date_created=datetime.strptime(sqlguild.created, "%Y-%m-%d %H:%M:%S%z"), - permissions=permissions, - channels=channels, - roll_result_thumbnails=roll_thumbnails, - ) - - # insert guild into mongo - await mongo_guild.insert() - logger.debug(f"MIGRATION: Insert guild `{mongo_guild.name}` into mongo") - - async def _migrate_campaigns(self) -> None: - """Migrate campaigns.""" - for sqlcampaign in SqliteCampaign.select(): - # fetch the guild object - guild = await Guild.get(sqlcampaign.guild.id) - - mongo_campaign = Campaign( - guild=sqlcampaign.guild.id, - name=sqlcampaign.name if sqlcampaign.name else "", - date_created=datetime.strptime(sqlcampaign.created, "%Y-%m-%d %H:%M:%S%z"), - date_modified=datetime.strptime(sqlcampaign.created, "%Y-%m-%d %H:%M:%S%z"), - description=sqlcampaign.description if sqlcampaign.description else "", - date_in_game=datetime.strptime(sqlcampaign.current_date, "%Y-%m-%d %H:%M:%S"), - ) - await mongo_campaign.insert() - - # associate the campaign with the guild - guild.campaigns.append(mongo_campaign) - if sqlcampaign.is_active: - guild.active_campaign = mongo_campaign - - await guild.save() - - # Create mapping to be used for user experience - self.campaign_map.append((sqlcampaign.id, str(mongo_campaign.id))) - - # Add chapters, npcs, and notes - for sqlchapter in SqliteCampaignChapter.select().where( - SqliteCampaignChapter.campaign == sqlcampaign - ): - chapter = CampaignChapter( - description_long=sqlchapter.description, - description_short=sqlchapter.short_description, - name=sqlchapter.name, - number=sqlchapter.chapter_number, - date_created=datetime.strptime(sqlchapter.created, "%Y-%m-%d %H:%M:%S%z"), - ) - mongo_campaign.chapters.append(chapter) - - for npc in SqliteCampaignNPC.select().where(SqliteCampaignNPC.campaign == sqlcampaign): - new_npc = CampaignNPC( - description=npc.description, name=npc.name, npc_class=npc.npc_class - ) - mongo_campaign.npcs.append(new_npc) - - for note in SqliteCampaignNote.select().where( - SqliteCampaignNote.campaign == sqlcampaign - ): - new_note = CampaignNote(description=note.description, name=note.name) - mongo_campaign.notes.append(new_note) - await mongo_campaign.save() - - logger.debug(f"MIGRATION: Insert campaign `{mongo_campaign.name}` into mongo") - - async def _migrate_users(self) -> None: - for sqluser in SqliteUser.select(): - # build base user object - mongo_user = User( - id=sqluser.user, - name=sqluser.data.get("display_name", None), - date_created=datetime.strptime( - sqluser.data.get("modified", None), "%Y-%m-%d %H:%M:%S%z" - ), - lifetime_cool_points=sqluser.data.get("lifetime_cool_points", None), - lifetime_experience=sqluser.data.get("lifetime_experience", None), - ) - await mongo_user.insert() - - # Add guilds - mongo_user.guilds.append(sqluser.guild.id) - - # Add campaign experience - for old, new in self.campaign_map: - if sqluser.data.get(f"{old}_experience", None): - xp_object = CampaignExperience( - xp_current=sqluser.data.get(f"{old}_experience", None), - cool_points=sqluser.data.get(f"{old}_total_cool_points", None), - xp_total=sqluser.data.get(f"{old}_total_experience", None), - ) - mongo_user.campaign_experience[new] = xp_object - - # Save user - await mongo_user.save() - logger.debug(f"MIGRATION: Insert user `{mongo_user.name}` into mongo") - - @staticmethod - async def _migrate_characters() -> None: - """Migrate characters.""" - for sqlchar in SqliteCharacter.select(): - # Grab the user object - logger.info(f"MIGRATION: Migrating character `{sqlchar}`") - - owned_by_user = await User.get(sqlchar.owned_by.user) - creator_user = await User.get(sqlchar.created_by.user) - - character = Character( - char_class_name=sqlchar.char_class.name if sqlchar.char_class else "", - date_created=datetime.strptime(sqlchar.created, "%Y-%m-%d %H:%M:%S%z"), - date_modified=datetime.strptime( - sqlchar.data.get("modified", None), "%Y-%m-%d %H:%M:%S%z" - ), - guild=sqlchar.guild.id, - images=list(sqlchar.data["images"]) if sqlchar.data.get("images", False) else [], - is_alive=sqlchar.data.get("is_alive", None), - name_first=sqlchar.data.get("first_name", None), - name_last=sqlchar.data.get("last_name", None), - name_nick=sqlchar.data.get("nickname", None), - type_chargen=sqlchar.data.get("chargen_character", False), - type_debug=sqlchar.data.get("debug_character", False), - type_developer=sqlchar.data.get("developer_character", False), - type_player=sqlchar.data.get("player_character", False), - type_storyteller=sqlchar.data.get("storyteller_character", False), - user_creator=creator_user.id if creator_user else None, - user_owner=owned_by_user.id if owned_by_user else None, - bio=sqlchar.data.get("bio", None), - age=sqlchar.data.get("age", None), - auspice=sqlchar.data.get("auspice", None), - breed=sqlchar.data.get("breed", None), - clan_name=sqlchar.clan.name if sqlchar.clan else "", - concept_name=sqlchar.data.get("concept_db", None), - creed_name=sqlchar.data.get("creed", None), - demeanor=sqlchar.data.get("demeanor", None), - dob=datetime.strptime(sqlchar.data["date_of_birth"], "%Y-%m-%d %H:%M:%S") - if sqlchar.data.get("date_of_birth", None) - else None, - essence=sqlchar.data.get("essence", None), - generation=sqlchar.data.get("generation", None), - nature=sqlchar.data.get("nature", None), - sire=sqlchar.data.get("sire", None), - tradition=sqlchar.data.get("tradition", None), - tribe=sqlchar.data.get("tribe", None), - ) - await character.insert() - - # Mark characters as active - if sqlchar.data.get("is_active", None): - owned_by_user.active_characters[str(character.guild)] = character - - # Add character to user character list - if owned_by_user: - owned_by_user.characters.append(character) - await owned_by_user.save() - else: - logger.warning(f"MIGRATION: No owner for character `{character.name}`") - - # Migration custom sections - for section in SqliteCustomSection.select().where( - SqliteCustomSection.character == sqlchar - ): - character.sheet_sections.append( - CharacterSheetSection(title=section.title, content=section.description) - ) - - # Assign trait values - all_trait_categories = [ - TraitCategory[trait.category.name.upper()] - for trait in sqlchar.traits_list - if trait.category.name.upper() != "ADVANTAGES" - ] - all_categories = sorted(set(all_trait_categories), key=lambda x: x.value.order) - - for category in all_categories: - for trait in sqlchar.traits_list: - if trait.category.name.upper() == category.name.upper(): - # Check if the trait is custom - if trait.name not in TraitCategory[ - category.name.upper() - ].value.COMMON and trait.name not in getattr( - TraitCategory[category.name.upper()].value, - character.char_class_name.upper(), - ): - is_custom = True - else: - is_custom = False - - # Create the new trait - new_trait = CharacterTrait( - category_name=trait.category.name.upper(), - character=str(character.id), - name=trait.name, - value=sqlchar.get_trait_value(trait), - is_custom=is_custom, - display_on_sheet=True, - max_value=get_max_trait_value(trait.name, trait.category.name), - ) - await new_trait.insert() - - # Add the trait to the character - character.traits.append(new_trait) - - # save the character - await character.save() - logger.debug(f"MIGRATION: Insert character `{character.name}` into mongo") - - @staticmethod - async def _migrate_roll_statistics() -> None: - """Migrate roll statistics.""" - for stat in SqliteRollStatistic.select(): - try: - sqlcharacter = stat.character if stat.character else None - except SqliteCharacter.DoesNotExist: - sqlcharacter = None - - if sqlcharacter: - character = await Character.find_one( - Character.name_first == sqlcharacter.data.get("first_name", "") - ) - else: - character = None - - new_stat = RollStatistic( - user=SqliteUser.get(SqliteUser.id == stat.user).user, - guild=stat.guild.id, - character=str(character.id) if character else None, - result=RollResultType[stat.result], - pool=stat.pool, - difficulty=stat.difficulty, - date_rolled=datetime.strptime(stat.date_rolled, "%Y-%m-%d %H:%M:%S%z"), - ) - await new_stat.insert() - - async def do_migration(self) -> None: - """Perform the migration.""" - if not await GlobalProperty.find().to_list(): - logger.info("MIGRATION: Migrate Version") - await self._migrate_version() - else: - logger.info("MIGRATION: Version already migrated") - - if not await Guild.find().to_list(): - logger.info("MIGRATION: Migrate Guilds") - await self._migrate_guilds() - else: - logger.info("MIGRATION: Guilds already migrated") - - if not await Campaign.find().to_list(): - logger.info("MIGRATION: Migrate Campaigns") - await self._migrate_campaigns() - else: - logger.info("MIGRATION: Campaigns already migrated") - - if not await User.find().to_list(): - logger.info("MIGRATION: Migrate Users") - await self._migrate_users() - else: - logger.info("MIGRATION: Users already migrated") - - if not await Character.find().to_list(): - logger.info("MIGRATION: Migrate Characters") - await self._migrate_characters() - else: - logger.info("MIGRATION: Characters already migrated") - - if not await RollStatistic.find().to_list(): - logger.info("MIGRATION: Migrate RollStatistic") - await self._migrate_roll_statistics() - else: - logger.info("MIGRATION: RollStatistic already migrated")