Skip to content

Commit

Permalink
tests: finish testing index creation
Browse files Browse the repository at this point in the history
  • Loading branch information
bolinocroustibat committed Sep 6, 2024
1 parent 8cc5967 commit 95c5e2a
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 18 deletions.
9 changes: 5 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5433/postgres")
RESOURCE_ID = "c4e3a9fb-4415-488e-ba57-d05269b27adf"
RESOURCE_ID_EXCEPTION = "d4e3a9fb-4415-488e-ba57-d05269b27adf"
RESOURCE_EXCEPTION_ID = "d4e3a9fb-4415-488e-ba57-d05269b27adf"
RESOURCE_EXCEPTION_TABLE_INDEXES = {"Nom": "index", "N° de certificat": "index"}
DATASET_ID = "601ddcfc85a59c3a45c2435a"
pytestmark = pytest.mark.asyncio

Expand Down Expand Up @@ -145,11 +146,11 @@ async def setup_catalog_with_resource_exception(setup_catalog):
['__id', 'Nom', 'Prenom', 'Societe', 'Adresse', 'CP', 'Ville', 'Tel1', 'Tel2', 'email', 'Organisme', 'Org Cofrac', 'Type de certificat', 'N° de certificat', 'Date début validité', 'Date fin validité']
"""
await Resource.insert(
dataset_id=DATASET_ID, resource_id=RESOURCE_ID_EXCEPTION, url="http://example.com/"
dataset_id=DATASET_ID, resource_id=RESOURCE_EXCEPTION_ID, url="http://example.com/"
)
await ResourceException.insert(
resource_id=RESOURCE_ID_EXCEPTION,
table_indexes={"Nom": "index", "N° de certificat": "index"},
resource_id=RESOURCE_EXCEPTION_ID,
table_indexes=RESOURCE_EXCEPTION_TABLE_INDEXES,
)


Expand Down
20 changes: 13 additions & 7 deletions tests/test_csv_analysis_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import pytest
from asyncpg import Record

from tests.conftest import RESOURCE_ID_EXCEPTION
from tests.conftest import RESOURCE_EXCEPTION_ID, RESOURCE_EXCEPTION_TABLE_INDEXES
from udata_hydra import config
from udata_hydra.analysis.csv import analyse_csv
from udata_hydra.db.resource import Resource
from udata_hydra.db.resource_exception import ResourceException
from udata_hydra.utils.db import get_columns_with_indexes

pytestmark = pytest.mark.asyncio

Expand All @@ -28,7 +29,7 @@ async def test_exception_analysis(
config.override(MAX_FILESIZE_ALLOWED={"csv": 5000})

# Create a previous fake check for the resource
check = await fake_check(resource_id=RESOURCE_ID_EXCEPTION)
check = await fake_check(resource_id=RESOURCE_EXCEPTION_ID)
filename, expected_count = ("20190618-annuaire-diagnostiqueurs.csv", 45522)
url = check["url"]
table_name = hashlib.md5(url.encode("utf-8")).hexdigest()
Expand All @@ -37,23 +38,28 @@ async def test_exception_analysis(
rmock.get(url, status=200, body=data)

# Check resource status before analysis
resource = await Resource.get(RESOURCE_ID_EXCEPTION)
resource = await Resource.get(RESOURCE_EXCEPTION_ID)
assert resource["status"] is None

# Analyse the CSV
await analyse_csv(check_id=check["id"])

# Check resource status after analysis
resource = await Resource.get(RESOURCE_ID_EXCEPTION)
resource = await Resource.get(RESOURCE_EXCEPTION_ID)
assert resource["status"] is None

# Check the table has been created in CSV DB, with the expected number of rows, and get the columns
row: Record = await db.fetchrow(f'SELECT *, count(*) over () AS count FROM "{table_name}"')
assert row["count"] == expected_count

# Check if a index has been created for the table
indexes: Record = await db.fetchrow("SELECT * FROM pg_indexes WHERE tablename = $1", table_name)
log.debug(f"Indexes: {dict(indexes)}")
# Check if indexes have been created for the table
expected_columns_with_indexes = list(RESOURCE_EXCEPTION_TABLE_INDEXES.keys())
expected_columns_with_indexes.append("__id")
indexes: list[Record] | None = await get_columns_with_indexes(table_name)
assert indexes
for idx in indexes:
assert idx["table_name"] == table_name
assert idx["column_name"] in expected_columns_with_indexes

# Check the profile has been saved in the tables_index
profile = await db.fetchrow(
Expand Down
8 changes: 5 additions & 3 deletions udata_hydra/analysis/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,19 +238,18 @@ def compute_create_table_query(
# TODO: other index types. Not easy with sqlalchemy, maybe use raw sql?

compiled_query = CreateTable(table).compile(dialect=asyncpg.dialect())
query = compiled_query.string
query: str = compiled_query.string

# Add the index creation queries to the main query
for index in table.indexes:
log.debug(f'Creating {index_type} on column "{col_name}"')
query_idx = CreateIndex(index).compile(dialect=asyncpg.dialect())
query = query + ";" + query_idx.string
query: str = query + ";" + query_idx.string

# compiled query will want to write "%% mon pourcent" VARCHAR but will fail when querying "% mon pourcent"
# also, "% mon pourcent" works well in pg as a column
# TODO: dirty hack, maybe find an alternative
query = query.replace("%%", "%")
log.debug(query)
return query


Expand Down Expand Up @@ -337,8 +336,11 @@ async def csv_to_db(
q = f'DROP TABLE IF EXISTS "{table_name}"'
db = await context.pool("csv")
await db.execute(q)

# Create table
q = compute_create_table_query(table_name=table_name, columns=columns, indexes=table_indexes)
await db.execute(q)

# this use postgresql COPY from an iterator, it's fast but might be difficult to debug
if not debug_insert:
# NB: also see copy_to_table for a file source
Expand Down
86 changes: 82 additions & 4 deletions udata_hydra/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,34 @@
from tempfile import NamedTemporaryFile

import aiohttp
import asyncpg
from asyncpg import Record, connect
from humanfriendly import parse_size
from minicli import cli, run, wrap
from progressist import ProgressBar
from sqlalchemy import (
JSON,
BigInteger,
Boolean,
Column,
Date,
DateTime,
Float,
Integer,
MetaData,
String,
Table,
UniqueConstraint,
)
from sqlalchemy.dialects.postgresql import asyncpg
from sqlalchemy.schema import CreateIndex, CreateTable, Index, PrimaryKeyConstraint

from udata_hydra import config
from udata_hydra.analysis.csv import analyse_csv, delete_table
from udata_hydra.analysis.csv import PYTHON_TYPE_TO_PG, PYTHON_TYPE_TO_PY, analyse_csv, delete_table
from udata_hydra.crawl.check_resources import check_resource as crawl_check_resource
from udata_hydra.db.resource import Resource
from udata_hydra.logger import setup_logging
from udata_hydra.migrations import Migrator
from udata_hydra.utils.db import get_columns_with_indexes

context = {}
log = setup_logging()
Expand All @@ -39,7 +56,7 @@ async def connection(db_name: str = "main"):
if db_name == "main"
else getattr(config, f"DATABASE_URL_{db_name.upper()}")
)
context["conn"][db_name] = await asyncpg.connect(
context["conn"][db_name] = await connect(
dsn=dsn, server_settings={"search_path": config.DATABASE_SCHEMA}
)
return context["conn"][db_name]
Expand Down Expand Up @@ -138,7 +155,7 @@ async def crawl_url(url: str, method: str = "get"):
@cli
async def check_resource(resource_id: str, method: str = "get"):
"""Trigger a complete check for a given resource_id"""
resource: asyncpg.Record | None = await Resource.get(resource_id)
resource: Record | None = await Resource.get(resource_id)
if not resource:
log.error("Resource not found in catalog")
return
Expand Down Expand Up @@ -299,6 +316,67 @@ async def purge_csv_tables():
log.info("Nothing to delete.")


@cli
async def create_table(table_name: str = "TEST"):
try:
conn = await connection()
await conn.execute(f'DROP TABLE "{table_name}" CASCADE')
except Exception as e:
log.error(e)

metadata = MetaData()
table: Table = Table(table_name, metadata, Column("__id", Integer, primary_key=True))

columns = {
"prenom": "varchar",
"nom": "varchar",
"age": "int",
"siren": "varchar",
}

indexes = {
"siren": "index",
"nom": "index",
}

for col_name, col_type in columns.items():
table.append_column(Column(col_name, PYTHON_TYPE_TO_PG.get(col_type, String)))

query = CreateTable(table).compile(dialect=asyncpg.dialect())
query = query.string.replace("%%", "%")

if indexes:
for col_name, index_type in indexes.items():
if index_type not in config.SQL_INDEXES_TYPES_SUPPORTED:
log.error(
f'Index type "{index_type}" is unknown or not supported yet! Index for colum {col_name} was not created.'
)
continue

else:
log.debug(f'Add index "{index_type}" on column "{col_name}"')
if index_type == "index":
# Create an index on the column
table.append_constraint(Index(f"{table_name}_{col_name}_idx", col_name))

# Add the index creation queries to the main query
for index in table.indexes:
log.debug(f'Creating index "{index_type}" on column "{col_name}"')
query_index_creation = CreateIndex(index).compile(dialect=asyncpg.dialect())
query_index_creation = query_index_creation.string.replace("%%", "%")
query = query + ";" + query_index_creation

log.debug(query)
conn = await connection()
await conn.execute(query)

# Check if a index has been created for the table
indexes: list[Record] | None = await get_columns_with_indexes(table_name)
assert indexes
for idx in indexes:
print(idx)


@wrap
async def cli_wrapper():
context["conn"] = {}
Expand Down
37 changes: 37 additions & 0 deletions udata_hydra/utils/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from asyncpg import Record

from udata_hydra import context


async def get_columns_with_indexes(table_name: str) -> list[Record] | None:
"""
Get the columns of a table which have indexes
Return a list of records with the following columns:
- table_name
- index_name
- column_name
"""
pool = await context.pool()
async with pool.acquire() as connection:
q = """
SELECT
t.relname as table_name,
i.relname as index_name,
a.attname as column_name
FROM
pg_class t,
pg_class i,
pg_index ix,
pg_attribute a
WHERE
t.oid = ix.indrelid
and i.oid = ix.indexrelid
and a.attrelid = t.oid
and a.attnum = ANY(ix.indkey)
and t.relkind = 'r'
and t.relname = $1
ORDER BY
t.relname,
i.relname;
"""
return await connection.fetch(q, table_name)

0 comments on commit 95c5e2a

Please sign in to comment.