Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: manage large resources exceptions differently #148

Merged
merged 41 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f1e8f24
feat: add create resources_exceptions table migration
bolinocroustibat Aug 27, 2024
87c4780
docs: update changelog
bolinocroustibat Aug 27, 2024
2b744f2
feat: create migrations
bolinocroustibat Aug 27, 2024
b022bea
feat: add create resource exception endpoint
bolinocroustibat Aug 27, 2024
d9c4191
feat: use resources_expections table instead of LARGE_RESOURCES_EXCEP…
bolinocroustibat Aug 27, 2024
0190b51
feat: update migration to create resources_exceptions table
bolinocroustibat Aug 27, 2024
4508648
Merge branch 'main' into manage-resource-exceptions
bolinocroustibat Sep 2, 2024
526a7bb
Merge branch 'main' into manage-resource-exceptions
bolinocroustibat Sep 2, 2024
27632c7
fix: fix types
bolinocroustibat Sep 2, 2024
18edcd1
feat: resources_exceptions.table_indexes instead of resources_excepti…
bolinocroustibat Sep 2, 2024
5088a51
fix: fix method to check if resource is an exception, and clean Resou…
bolinocroustibat Sep 2, 2024
39b7c28
fix: fix types
bolinocroustibat Sep 2, 2024
fa3a963
fix: fix migration when dropping tables with foreign keys
bolinocroustibat Sep 3, 2024
aa7240f
fix: remove inserting data in resources_exceptions table
bolinocroustibat Sep 3, 2024
9f1950c
fix: fix project metadata loading (#157)
bolinocroustibat Sep 3, 2024
d4fc584
fix: fix method to check if resource is an exception
bolinocroustibat Sep 3, 2024
f1058ce
Merge branch 'main' into manage-resource-exceptions
bolinocroustibat Sep 3, 2024
bb17714
tests: fix tests
bolinocroustibat Sep 3, 2024
a9fd037
fix: fix loading of table_indexes column
bolinocroustibat Sep 3, 2024
f1461da
tests: add setup_resources_exceptions in conftest and fix test_except…
bolinocroustibat Sep 3, 2024
17c2541
tests: remove useless args
bolinocroustibat Sep 3, 2024
5178358
fix: fix index creation logic
bolinocroustibat Sep 3, 2024
1f4e210
docs: fix doctsring
bolinocroustibat Sep 3, 2024
d0005d0
docs: fix docstrings
bolinocroustibat Sep 4, 2024
37d64c9
Merge branch 'main' into manage-resource-exceptions
bolinocroustibat Sep 5, 2024
f9c79d1
fix: fix CRUD method to insert a resource exception with dict of inde…
bolinocroustibat Sep 6, 2024
0f06634
fix: fix create table with unique index
bolinocroustibat Sep 6, 2024
c3a81df
feat: add list of allowed indexes
bolinocroustibat Sep 6, 2024
61a3af5
tests: add tests
bolinocroustibat Sep 6, 2024
ccf77dc
fix: fix index creation logic, use slugify
bolinocroustibat Sep 6, 2024
d0cb339
fix: only one type of index type for now
bolinocroustibat Sep 6, 2024
cae924b
tests: finish testing index creation
bolinocroustibat Sep 6, 2024
75261f2
feat: add routes to get and delete resources exceptions
bolinocroustibat Sep 6, 2024
dcefa29
fix: fix SQL query bug due to typo
bolinocroustibat Sep 9, 2024
938bb78
fix: revert cli.py which was only used for testing
bolinocroustibat Sep 9, 2024
a15fcef
Merge branch 'main' into manage-resource-exceptions
bolinocroustibat Sep 10, 2024
d6a5a2a
Merge branch 'main' into manage-resource-exceptions
bolinocroustibat Sep 10, 2024
0281b8e
tests: fix wrong test file location
bolinocroustibat Sep 10, 2024
56b1aba
Merge branch 'main' into manage-resource-exceptions
bolinocroustibat Sep 11, 2024
baabec5
Merge branch 'main' into manage-resource-exceptions
bolinocroustibat Sep 12, 2024
c933b16
docs: better naming in SQL query
bolinocroustibat Sep 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
- Refactor routes URLs to be more RESTful and separate legacy routes code from new routes code [#132](https://github.com/datagouv/hydra/pull/132)
- Display app version and environment in health check endpoint [#164](https://github.com/datagouv/hydra/pull/164)
- Use ENVIRONMENT from config file instead of env var [#165](https://github.com/datagouv/hydra/pull/165)
- Manage large resources exceptions differently [#148](https://github.com/datagouv/hydra/pull/148)

## 1.0.1 (2023-01-04)

Expand Down
405 changes: 238 additions & 167 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ minio = "7.2.7"
pyarrow = "16.1.0"
python-dateutil = "^2.8.2"
python-magic = "^0.4.25"
python-slugify = "^8.0.4"
progressist = "^0.1.0"
redis = "^4.1.4"
rq = "^1.11.1"
Expand Down
21 changes: 21 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import hashlib
import logging
import os
import uuid
from datetime import datetime
Expand All @@ -17,17 +18,22 @@
from udata_hydra.app import app_factory
from udata_hydra.db.check import Check
from udata_hydra.db.resource import Resource
from udata_hydra.db.resource_exception import ResourceException
from udata_hydra.logger import stop_sentry

DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5433/postgres")
RESOURCE_ID = "c4e3a9fb-4415-488e-ba57-d05269b27adf"
RESOURCE_EXCEPTION_ID = "d4e3a9fb-4415-488e-ba57-d05269b27adf"
RESOURCE_EXCEPTION_TABLE_INDEXES = {"Nom": "index", "N° de certificat": "index"}
RESOURCE_URL = "https://example.com/resource-1"
DATASET_ID = "601ddcfc85a59c3a45c2435a"
NOT_EXISTING_RESOURCE_ID = "5d0b2b91-b21b-4120-83ef-83f818ba2451"
pytestmark = pytest.mark.asyncio

nest_asyncio.apply()

log = logging.getLogger("udata-hydra")


def dummy(return_value=None):
"""
Expand Down Expand Up @@ -135,6 +141,21 @@ def setup_catalog(catalog_content, rmock):
run("load_catalog", url=catalog)


@pytest.fixture
async def setup_catalog_with_resource_exception(setup_catalog):
"""Setup a catalog with a resource that is too large to be processed
Columns for the resource RESOURCE_ID_EXCEPTION:
['__id', 'Nom', 'Prenom', 'Societe', 'Adresse', 'CP', 'Ville', 'Tel1', 'Tel2', 'email', 'Organisme', 'Org Cofrac', 'Type de certificat', 'N° de certificat', 'Date début validité', 'Date fin validité']
"""
await Resource.insert(
dataset_id=DATASET_ID, resource_id=RESOURCE_EXCEPTION_ID, url="http://example.com/"
)
await ResourceException.insert(
resource_id=RESOURCE_EXCEPTION_ID,
table_indexes=RESOURCE_EXCEPTION_TABLE_INDEXES,
)


@pytest.fixture
def produce_mock(mocker):
mocker.patch("udata_hydra.crawl.process_check_data.send", dummy())
Expand Down
51 changes: 5 additions & 46 deletions tests/test_analysis/test_analysis_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from yarl import URL

from tests.conftest import RESOURCE_ID, RESOURCE_URL
from udata_hydra import config
from udata_hydra.analysis.csv import analyse_csv, csv_to_db
from udata_hydra.db.resource import Resource

Expand Down Expand Up @@ -85,46 +84,6 @@ async def test_analyse_csv_big_file(setup_catalog, rmock, db, fake_check, produc
assert profile["total_lines"] == expected_count


async def test_exception_analysis(setup_catalog, rmock, db, fake_check, produce_mock):
"""
Tests that exception resources (files that are too large to be normally processed) are indeed processed.
"""
save_config = config.MAX_FILESIZE_ALLOWED
config.override(MAX_FILESIZE_ALLOWED={"csv": 5000})
await db.execute(
f"UPDATE catalog SET resource_id = '{config.LARGE_RESOURCES_EXCEPTIONS[0]}' WHERE id=1"
)
check = await fake_check(resource_id=config.LARGE_RESOURCES_EXCEPTIONS[0])
filename, expected_count = ("20190618-annuaire-diagnostiqueurs.csv", 45522)
url = check["url"]
table_name = hashlib.md5(url.encode("utf-8")).hexdigest()
with open(f"tests/data/{filename}", "rb") as f:
data = f.read()
rmock.get(url, status=200, body=data)

# Check resource status before analysis
resource = await Resource.get(config.LARGE_RESOURCES_EXCEPTIONS[0])
assert resource["status"] is None

# Analyse the CSV
await analyse_csv(check_id=check["id"])

# Check resource status after analysis
resource = await Resource.get(config.LARGE_RESOURCES_EXCEPTIONS[0])
assert resource["status"] is None

count = await db.fetchrow(f'SELECT count(*) AS count FROM "{table_name}"')
assert count["count"] == expected_count
profile = await db.fetchrow(
"SELECT csv_detective FROM tables_index WHERE resource_id = $1", check["resource_id"]
)
profile = json.loads(profile["csv_detective"])
for attr in ("header", "columns", "formats", "profile"):
assert profile[attr]
assert profile["total_lines"] == expected_count
config.override(MAX_FILESIZE_ALLOWED=save_config)


@pytest.mark.parametrize(
"line_expected",
(
Expand Down Expand Up @@ -155,7 +114,7 @@ async def test_csv_to_db_simple_type_casting(db, line_expected, clean_db):
"header": list(columns.keys()),
"columns": columns,
}
await csv_to_db(fp.name, inspection, "test_table")
await csv_to_db(file_path=fp.name, inspection=inspection, table_name="test_table")
res = list(await db.fetch("SELECT * FROM test_table"))
assert len(res) == 1
cols = ["__id", "int", "float", "string", "bool"]
Expand Down Expand Up @@ -200,7 +159,7 @@ async def test_csv_to_db_complex_type_casting(db, line_expected, clean_db):
"columns": columns,
}
# Insert the data
await csv_to_db(fp.name, inspection, "test_table")
await csv_to_db(file_path=fp.name, inspection=inspection, table_name="test_table")
res = list(await db.fetch("SELECT * FROM test_table"))
assert len(res) == 1
cols = ["__id", "json", "date", "datetime"]
Expand All @@ -227,7 +186,7 @@ async def test_basic_sql_injection(db, clean_db):
"columns": columns,
}
# Insert the data
await csv_to_db(fp.name, inspection, "test_table")
await csv_to_db(file_path=fp.name, inspection=inspection, table_name="test_table")
res = await db.fetchrow("SELECT * FROM test_table")
assert res[injection] == "test"

Expand All @@ -249,7 +208,7 @@ async def test_percentage_column(db, clean_db):
"columns": columns,
}
# Insert the data
await csv_to_db(fp.name, inspection, "test_table")
await csv_to_db(file_path=fp.name, inspection=inspection, table_name="test_table")
res = await db.fetchrow("SELECT * FROM test_table")
assert res["% mon pourcent"] == "test"

Expand All @@ -271,7 +230,7 @@ async def test_reserved_column_name(db, clean_db):
"columns": columns,
}
# Insert the data
await csv_to_db(fp.name, inspection, "test_table")
await csv_to_db(file_path=fp.name, inspection=inspection, table_name="test_table")
res = await db.fetchrow("SELECT * FROM test_table")
assert res["xmin__hydra_renamed"] == "test"

Expand Down
72 changes: 72 additions & 0 deletions tests/test_analysis/test_analysis_csv_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import hashlib
import json
import logging

import pytest
from asyncpg import Record

from tests.conftest import RESOURCE_EXCEPTION_ID, RESOURCE_EXCEPTION_TABLE_INDEXES
from udata_hydra import config
from udata_hydra.analysis.csv import analyse_csv
from udata_hydra.db.resource import Resource
from udata_hydra.db.resource_exception import ResourceException
from udata_hydra.utils.db import get_columns_with_indexes

pytestmark = pytest.mark.asyncio


log = logging.getLogger("udata-hydra")


async def test_exception_analysis(
setup_catalog_with_resource_exception, rmock, db, fake_check, produce_mock
):
"""
Tests that exception resources (files that are too large to be normally processed) are indeed processed.
"""
# Change config to accept large files
save_config = config.MAX_FILESIZE_ALLOWED
config.override(MAX_FILESIZE_ALLOWED={"csv": 5000})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have config be a context manager to make sure that its value gets back to the original value even when the test fail (and thus the end of its code doesn't get executed)?
Maybe make it a fixture?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it could be easier indeed, good idea. I'll keep that for another PR.


# Create a previous fake check for the resource
check = await fake_check(resource_id=RESOURCE_EXCEPTION_ID)
filename, expected_count = ("20190618-annuaire-diagnostiqueurs.csv", 45522)
url = check["url"]
table_name = hashlib.md5(url.encode("utf-8")).hexdigest()
with open(f"tests/data/{filename}", "rb") as f:
data = f.read()
rmock.get(url, status=200, body=data)

# Check resource status before analysis
resource = await Resource.get(RESOURCE_EXCEPTION_ID)
assert resource["status"] is None

# Analyse the CSV
await analyse_csv(check_id=check["id"])

# Check resource status after analysis
resource = await Resource.get(RESOURCE_EXCEPTION_ID)
assert resource["status"] is None

# Check the table has been created in CSV DB, with the expected number of rows, and get the columns
row: Record = await db.fetchrow(f'SELECT *, count(*) over () AS count FROM "{table_name}"')
assert row["count"] == expected_count

# Check if indexes have been created for the table
expected_columns_with_indexes = list(RESOURCE_EXCEPTION_TABLE_INDEXES.keys())
expected_columns_with_indexes.append("__id")
indexes: list[Record] | None = await get_columns_with_indexes(table_name)
assert indexes
for idx in indexes:
assert idx["table_name"] == table_name
assert idx["column_name"] in expected_columns_with_indexes

# Check the profile has been saved in the tables_index
profile = await db.fetchrow(
"SELECT csv_detective FROM tables_index WHERE resource_id = $1", check["resource_id"]
)
profile = json.loads(profile["csv_detective"])
for attr in ("header", "columns", "formats", "profile"):
assert profile[attr]
assert profile["total_lines"] == expected_count
config.override(MAX_FILESIZE_ALLOWED=save_config)
3 changes: 1 addition & 2 deletions tests/test_parquet_export.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from io import BytesIO
from typing import Optional

import pyarrow.parquet as pq
import pytest
Expand Down Expand Up @@ -27,7 +26,7 @@ async def test_parquet_conversion(
):
filename, expected_count = file_and_count
file_path = f"tests/data/{filename}"
inspection: Optional[dict] = await perform_csv_inspection(file_path)
inspection: dict | None = await perform_csv_inspection(file_path)
assert inspection
columns = inspection["columns"]
columns = {
Expand Down
53 changes: 45 additions & 8 deletions udata_hydra/analysis/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from csv_detective.detection import engine_to_file
from csv_detective.explore_csv import routine as csv_detective_routine
from progressist import ProgressBar
from slugify import slugify
from sqlalchemy import (
JSON,
BigInteger,
Expand All @@ -26,7 +27,7 @@
Table,
)
from sqlalchemy.dialects.postgresql import asyncpg
from sqlalchemy.schema import CreateTable
from sqlalchemy.schema import CreateIndex, CreateTable, Index
from str2bool import str2bool
from str2float import str2float

Expand All @@ -36,6 +37,7 @@
from udata_hydra.db import compute_insert_query
from udata_hydra.db.check import Check
from udata_hydra.db.resource import Resource
from udata_hydra.db.resource_exception import ResourceException
from udata_hydra.utils import Reader, Timer, download_resource, queue, send
from udata_hydra.utils.minio import MinIOClient
from udata_hydra.utils.parquet import save_as_parquet
Expand Down Expand Up @@ -121,12 +123,14 @@ async def analyse_csv(
# Update resource status to ANALYSING_CSV
await Resource.update(resource_id, {"status": "ANALYSING_CSV"})

exceptions = config.LARGE_RESOURCES_EXCEPTIONS
# Check if the resource is in the exceptions table
# If it is, get the table_indexes to use them later
exception: Record | None = await ResourceException.get_by_resource_id(resource_id)
table_indexes: dict | None = json.loads(exception["table_indexes"]) if exception else None

timer = Timer("analyse-csv")
assert any(_ is not None for _ in (check_id, url))
url: str = check.get("url") or url
exception_file = str(check.get("resource_id", "")) in exceptions

headers = json.loads(check.get("headers") or "{}")
tmp_file = (
Expand All @@ -135,7 +139,7 @@ async def analyse_csv(
else await download_resource(
url=url,
headers=headers,
max_size_allowed=None if exception_file else int(config.MAX_FILESIZE_ALLOWED["csv"]),
max_size_allowed=None if exception else int(config.MAX_FILESIZE_ALLOWED["csv"]),
)
)
table_name = hashlib.md5(url.encode("utf-8")).hexdigest()
Expand All @@ -151,6 +155,7 @@ async def analyse_csv(
file_path=tmp_file.name,
inspection=csv_inspection,
table_name=table_name,
table_indexes=table_indexes,
resource_id=resource_id,
debug_insert=debug_insert,
)
Expand Down Expand Up @@ -207,17 +212,45 @@ def smart_cast(_type: str, value, failsafe: bool = False) -> Any:
return None


def compute_create_table_query(table_name: str, columns: list) -> str:
def compute_create_table_query(
table_name: str, columns: dict, indexes: dict[str, str] | None = None
) -> str:
"""Use sqlalchemy to build a CREATE TABLE statement that should not be vulnerable to injections"""

metadata = MetaData()
table = Table(table_name, metadata, Column("__id", Integer, primary_key=True))

for col_name, col_type in columns.items():
table.append_column(Column(col_name, PYTHON_TYPE_TO_PG.get(col_type, String)))
compiled = CreateTable(table).compile(dialect=asyncpg.dialect())

if indexes:
for col_name, index_type in indexes.items():
if index_type not in config.SQL_INDEXES_TYPES_SUPPORTED:
log.error(
f'Index type "{index_type}" is unknown or not supported yet! Index for colum {col_name} was not created.'
)
continue

else:
if index_type == "index":
index_name = f"{table_name}_{slugify(col_name)}_idx"
table.append_constraint(Index(index_name, col_name))
# TODO: other index types. Not easy with sqlalchemy, maybe use raw sql?

compiled_query = CreateTable(table).compile(dialect=asyncpg.dialect())
query: str = compiled_query.string

# Add the index creation queries to the main query
for index in table.indexes:
log.debug(f'Creating {index_type} on column "{col_name}"')
query_idx = CreateIndex(index).compile(dialect=asyncpg.dialect())
query: str = query + ";" + query_idx.string

# compiled query will want to write "%% mon pourcent" VARCHAR but will fail when querying "% mon pourcent"
# also, "% mon pourcent" works well in pg as a column
# TODO: dirty hack, maybe find an alternative
return compiled.string.replace("%%", "%")
query = query.replace("%%", "%")
return query


def generate_records(file_path: str, inspection: dict, columns: dict) -> Iterator[list]:
Expand Down Expand Up @@ -269,6 +302,7 @@ async def csv_to_db(
file_path: str,
inspection: dict,
table_name: str,
table_indexes: dict[str, str] | None = None,
resource_id: str | None = None,
debug_insert: bool = False,
) -> None:
Expand Down Expand Up @@ -302,8 +336,11 @@ async def csv_to_db(
q = f'DROP TABLE IF EXISTS "{table_name}"'
db = await context.pool("csv")
await db.execute(q)
q = compute_create_table_query(table_name, columns)

# Create table
q = compute_create_table_query(table_name=table_name, columns=columns, indexes=table_indexes)
await db.execute(q)

# this use postgresql COPY from an iterator, it's fast but might be difficult to debug
if not debug_insert:
# NB: also see copy_to_table for a file source
Expand Down
Loading