From a9887bda933d4286dfaa5210aa4cb5cee3b7cd77 Mon Sep 17 00:00:00 2001 From: Olwe Samuel Date: Tue, 23 Jul 2024 18:26:38 +0300 Subject: [PATCH] Created numeric cve ids --- migrations/versions/5df43dc932dd_.py | 46 +++++++++++++++ tests/__init__.py | 83 ++++++++++++++++++++++++++++ tests/test_commands.py | 27 +++++++++ tests/test_routes.py | 83 +--------------------------- webapp/commands.py | 12 ++++ webapp/models.py | 26 +++++++++ 6 files changed, 196 insertions(+), 81 deletions(-) create mode 100644 migrations/versions/5df43dc932dd_.py create mode 100644 tests/test_commands.py create mode 100644 webapp/commands.py diff --git a/migrations/versions/5df43dc932dd_.py b/migrations/versions/5df43dc932dd_.py new file mode 100644 index 0000000..ed2a8e7 --- /dev/null +++ b/migrations/versions/5df43dc932dd_.py @@ -0,0 +1,46 @@ +"""empty message + +Revision ID: 5df43dc932dd +Revises: c27fec1bfee2 +Create Date: 2024-07-23 14:49:17.928909 + +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "5df43dc932dd" +down_revision = "c27fec1bfee2" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "cve", sa.Column("numerical_id", sa.Numeric(), nullable=True) + ) + op.create_index( + op.f("ix_cve_numerical_id"), "cve", ["numerical_id"], unique=False + ) + op.drop_index("notices_published_desc_idx", table_name="notice") + op.drop_index("notices_published_idx", table_name="notice") + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_index( + "notices_published_idx", "notice", ["published"], unique=False + ) + op.create_index( + "notices_published_desc_idx", + "notice", + [sa.text("published DESC")], + unique=False, + ) + op.drop_index(op.f("ix_cve_numerical_id"), table_name="cve") + op.drop_column("cve", "numerical_id") + # ### end Alembic commands ### diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..521129e 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,83 @@ +# Standard library +from contextlib import redirect_stderr +import io +import os +import unittest +import warnings + +# Packages +from sqlalchemy_utils import database_exists, create_database +import flask_migrate + + +# Local +from tests.fixtures.models import make_models + +""" +Monkey-patching before importing the main application +=== + +Get the database connection string from the TEST_DATABASE_URL environment +variable. This variabel is required, as it's important not to accidentally +wipe out a real database. + +Replace the authorization_required view decorator with a transparent function +to disable authorization checks for testing privileged views. +This is not ideal, as it means we're not testing the actual authorization +functionality, but I don't know of a good way to do that right now. +""" + +from webapp import auth +from tests.helpers import transparent_decorator + +auth.authorization_required = transparent_decorator +os.environ["DATABASE_URL"] = os.environ["TEST_DATABASE_URL"] + +from webapp.app import app, db # noqa: E402 + + +# Create database if it doesn't exist +with app.app_context(): + if not database_exists(db.engine.url): + create_database(db.engine.url) + + +# Suppress annoying ResourceWarnings +warnings.filterwarnings(action="ignore", category=ResourceWarning) + + +class BaseTestCase(unittest.TestCase): + def setUp(self): + app.testing = True + + # Set up app context + self.context = app.app_context() + self.context.push() + + # Clear DB + db.drop_all() + with redirect_stderr(io.StringIO()): + flask_migrate.stamp(revision="base") + + # Prepare DB + with redirect_stderr(io.StringIO()): + flask_migrate.upgrade() + + # Import data + self.models = make_models() + db.session.add(self.models["cve"]) + db.session.add(self.models["notice"]) + db.session.add(self.models["release"]) + db.session.add(self.models["package"]) + db.session.add(self.models["status"]) + db.session.commit() + + self.client = app.test_client() + return super().setUp() + + def tearDown(self): + db.session.close() + + self.context.pop() + + return super().tearDown() diff --git a/tests/test_commands.py b/tests/test_commands.py new file mode 100644 index 0000000..53ba82e --- /dev/null +++ b/tests/test_commands.py @@ -0,0 +1,27 @@ +from click.testing import CliRunner +import unittest + +from tests import BaseTestCase, db +from webapp.commands import insert_numerical_cve_ids +from webapp.models import CVE + + +class TestCommands(BaseTestCase): + runner = CliRunner() + + def test_upsert_numerical_cve_ids(self): + """ + Numerical CVE ids should be inserted correctly + """ + result = self.runner.invoke(insert_numerical_cve_ids) + assert result.exit_code == 0 + + # Check that the numerical cve id was inserted + new_cve = ( + db.session.query(CVE).filter(CVE.id == "CVE-1111-0001").first() + ) + assert new_cve.numerical_id == int("11110001") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_routes.py b/tests/test_routes.py index 19c95b4..361bf79 100644 --- a/tests/test_routes.py +++ b/tests/test_routes.py @@ -1,88 +1,9 @@ -# Standard library -from contextlib import redirect_stderr -import io -import os import unittest -import warnings - -# Packages -from sqlalchemy_utils import database_exists, create_database -import flask_migrate - - -# Local -from tests.fixtures.models import make_models +from tests import BaseTestCase from tests.fixtures import payloads -""" -Monkey-patching before importing the main application -=== - -Get the database connection string from the TEST_DATABASE_URL environment -variable. This variabel is required, as it's important not to accidentally -wipe out a real database. - -Replace the authorization_required view decorator with a transparent function -to disable authorization checks for testing privileged views. -This is not ideal, as it means we're not testing the actual authorization -functionality, but I don't know of a good way to do that right now. -""" - -from webapp import auth -from tests.helpers import transparent_decorator - -auth.authorization_required = transparent_decorator -os.environ["DATABASE_URL"] = os.environ["TEST_DATABASE_URL"] - -from webapp.app import app, db # noqa: E402 - - -# Create database if it doesn't exist -with app.app_context(): - if not database_exists(db.engine.url): - create_database(db.engine.url) - - -# Suppress annoying ResourceWarnings -warnings.filterwarnings(action="ignore", category=ResourceWarning) - - -class TestRoutes(unittest.TestCase): - def setUp(self): - app.testing = True - - # Set up app context - self.context = app.app_context() - self.context.push() - - # Clear DB - db.drop_all() - with redirect_stderr(io.StringIO()): - flask_migrate.stamp(revision="base") - - # Prepare DB - with redirect_stderr(io.StringIO()): - flask_migrate.upgrade() - - # Import data - self.models = make_models() - db.session.add(self.models["cve"]) - db.session.add(self.models["notice"]) - db.session.add(self.models["release"]) - db.session.add(self.models["package"]) - db.session.add(self.models["status"]) - db.session.commit() - - self.client = app.test_client() - return super().setUp() - - def tearDown(self): - db.session.close() - - self.context.pop() - - return super().tearDown() +class TestRoutes(BaseTestCase): def test_spec(self): response = self.client.get("/security/api/spec.json") diff --git a/webapp/commands.py b/webapp/commands.py new file mode 100644 index 0000000..0a550a0 --- /dev/null +++ b/webapp/commands.py @@ -0,0 +1,12 @@ +import click +from webapp.app import app +from webapp.models import ( + upsert_numerical_cve_ids, +) + + +@app.cli.command("insert-numerical-cve-ids") +def insert_numerical_cve_ids(): + """Management script for the Wiki application.""" + upsert_numerical_cve_ids() + click.echo("Numerical CVE ids inserted successfully.") diff --git a/webapp/models.py b/webapp/models.py index 1e77717..18f53c0 100644 --- a/webapp/models.py +++ b/webapp/models.py @@ -1,5 +1,6 @@ from collections import defaultdict from datetime import datetime +import re from sqlalchemy import ( Boolean, @@ -9,6 +10,7 @@ Float, ForeignKey, JSON, + Numeric, String, Table, func, @@ -51,6 +53,7 @@ class CVE(db.Model): __tablename__ = "cve" id = Column(String, primary_key=True) + numerical_id = Column(Numeric, index=True) published = Column(DateTime) description = Column(String) ubuntu_description = Column(String) @@ -143,6 +146,29 @@ def notices_ids(self): return [notice.id for notice in self.notices] +def convert_cve_id_to_numerical_id(cve_id): + """ + Convert a CVE id to a numerical id. + """ + id_match = re.match(r"^[A-Z]{1,3}-(\d*)-(\d*)", cve_id) + # Upsert numerical_id + return int(id_match.group(1) + id_match.group(2)) + + +def upsert_numerical_cve_ids(): + """ + Insert or update the numerical_id column using the CVE id. + e.g 'CVE-2025-12345' -> 202512345 + """ + all_cves = db.session.query(CVE).all() + updated_cves = [] + for cve in all_cves: + cve.numerical_id = convert_cve_id_to_numerical_id(cve.id) + updated_cves.append(cve) + db.session.add_all(updated_cves) + db.session.commit() + + class Notice(db.Model): __tablename__ = "notice"