Skip to content

Commit

Permalink
PDOK-16629 use pydantic (#133)
Browse files Browse the repository at this point in the history
* add pydantic and stop support for python 3.7

PDOK-16629

* use pydantic instead of dicts

PDOK-16629

* add support for (validating and generating) (unique) indices and foreign keys in table definitions

PDOK-16629

* add test for foreign key violation

PDOK-16629

* fix deprecated pydantic model Config

PDOK-16629

* turn on gdal exceptions in some test_table_definitions_check

PDOK-16629

* perform foreign key violations check on whole gpkg at once

PDOK-16629
  • Loading branch information
roelarents authored Nov 18, 2024
1 parent 0608dcf commit 725a330
Show file tree
Hide file tree
Showing 21 changed files with 800 additions and 139 deletions.
6 changes: 1 addition & 5 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@ jobs:
strategy:
matrix:
os: [ubuntu-24.04, ubuntu-22.04, ubuntu-20.04] # no ubuntugis @ ubuntu-24.04
python-version: ['3.11', '3.10', '3.9', '3.8', '3.7'] # , '3.6'] <- 3.6 needs setup.cfg
python-version: ['3.11', '3.10', '3.9', '3.8']
gdal-version: ['3.8', '3.6', '3.4']
exclude:
- os: ubuntu-24.04
python-version: '3.9'
- os: ubuntu-24.04
python-version: '3.7'
- os: ubuntu-24.04
python-version: '3.7'
- os: ubuntu-24.04
gdal-version: '3.6'
- os: ubuntu-24.04
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
ARG GDAL_VERSION=3.9.1

FROM ghcr.io/osgeo/gdal:alpine-normal-${GDAL_VERSION} AS base
# docker run ghcr.io/osgeo/gdal:alpine-normal-3.9.1 python3 --version > Python 3.11.9

LABEL maintainer="Roel van den Berg <roel.vandenberg@kadaster.nl>"

Expand Down
42 changes: 27 additions & 15 deletions geopackage_validator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
"""Main CLI entry for the Geopackage validator tool."""
# Setup logging before package imports.
import logging
from datetime import datetime
from pathlib import Path
import sys
import time
from datetime import datetime
from pathlib import Path

import click
import click_log
Expand Down Expand Up @@ -302,6 +302,13 @@ def geopackage_validator_command(
is_flag=True,
help="Output yaml",
)
@click.option(
"--with-indexes-and-fks",
default=False,
required=False,
is_flag=True,
help="Include indexes (and unique constraints) and foreign keys in the definitions",
)
@click.option(
"--s3-endpoint-no-protocol",
envvar="S3_ENDPOINT_NO_PROTOCOL",
Expand Down Expand Up @@ -367,17 +374,18 @@ def geopackage_validator_command(
)
@click_log.simple_verbosity_option(logger)
def geopackage_validator_command_generate_table_definitions(
gpkg_path,
yaml,
s3_endpoint_no_protocol,
s3_access_key,
s3_secret_key,
s3_bucket,
s3_key,
s3_secure,
s3_virtual_hosting,
s3_signing_region,
s3_no_sign_request,
gpkg_path: Path,
yaml: bool,
with_indexes_and_fks: bool,
s3_endpoint_no_protocol: str,
s3_access_key: str,
s3_secret_key: str,
s3_bucket: str,
s3_key: str,
s3_secure: bool,
s3_virtual_hosting: bool,
s3_signing_region: str,
s3_no_sign_request: bool,
):
gpkg_path_not_exists = s3_endpoint_no_protocol is None and (
gpkg_path is None
Expand All @@ -399,7 +407,9 @@ def geopackage_validator_command_generate_table_definitions(
s3_signing_region=s3_signing_region,
s3_no_sign_request=s3_no_sign_request,
)
definitionlist = generate.generate_definitions_for_path(gpkg_path)
definitionlist = generate.generate_definitions_for_path(
gpkg_path, with_indexes_and_fks
)
else:
with s3.minio_resource(
s3_endpoint_no_protocol,
Expand All @@ -409,7 +419,9 @@ def geopackage_validator_command_generate_table_definitions(
s3_key,
s3_secure,
) as localfilename:
definitionlist = generate.generate_definitions_for_path(localfilename)
definitionlist = generate.generate_definitions_for_path(
localfilename, with_indexes_and_fks
)
output.print_output(definitionlist, yaml)
except Exception:
logger.exception("Error while generating table definitions")
Expand Down
154 changes: 124 additions & 30 deletions geopackage_validator/generate.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import logging
from typing import Dict, List, Union
from collections import OrderedDict
from typing import List, Optional, Dict

from osgeo import ogr
from osgeo.ogr import DataSource
from osgeo.ogr import DataSource, Layer

from geopackage_validator import utils
from geopackage_validator import __version__
from geopackage_validator import utils
from geopackage_validator.models import (
ColumnDefinition,
ColumnMapping,
ForeignKeyDefinition,
IndexDefinition,
TableDefinition,
TablesDefinition,
)
from geopackage_validator.utils import group_by

logger = logging.getLogger(__name__)

ColumnDefinition = List[Dict[str, str]]
TableDefinition = Dict[str, Union[int, Dict[str, ColumnDefinition]]]


def columns_definition(table, geometry_column) -> ColumnDefinition:
def column_definitions(table, geometry_column) -> List[ColumnDefinition]:
layer_definition = table.GetLayerDefn()

assert layer_definition, f'Invalid Layer {"" if not table else table.GetName()}'
Expand All @@ -28,27 +32,106 @@ def columns_definition(table, geometry_column) -> ColumnDefinition:
for column_id in range(field_count)
]

fid_column = fid_column_definition(table)
fid_columns = fid_column_definition(table)

return fid_column + [geometry_column] + columns
return fid_columns + [geometry_column] + columns


def fid_column_definition(table) -> ColumnDefinition:
def fid_column_definition(table) -> List[ColumnDefinition]:
name = table.GetFIDColumn()
if not name:
return []
return [{"name": name, "type": "INTEGER"}]
return [ColumnDefinition(name=name, type="INTEGER")]


def get_index_definitions(
dataset: DataSource, table_name: str
) -> List[IndexDefinition]:
index_definitions: List[IndexDefinition] = []
index_list = dataset.ExecuteSQL(
f"select name, \"unique\", origin from pragma_index_list('{table_name}');"
)
pk_in_index_list = False
for index_listing in index_list:
pk_in_index_list = pk_in_index_list or index_listing["origin"] == "pk"
index_definitions.append(
IndexDefinition(
columns=tuple(get_index_column_names(dataset, index_listing["name"])),
unique=bool(int(index_listing["unique"])),
)
)
dataset.ReleaseResultSet(index_list)
index_definitions = sorted(index_definitions, key=lambda d: d.columns)

if not pk_in_index_list:
pk_index = get_pk_index(dataset, table_name)
if pk_index is not None:
index_definitions.insert(0, pk_index)

return index_definitions


def get_pk_index(dataset: DataSource, table_name: str) -> Optional[IndexDefinition]:
pk_columns = dataset.ExecuteSQL(
f"select name from pragma_table_info('{table_name}') where pk;"
)
column_names = tuple(r["name"] for r in pk_columns)
if len(column_names) == 0:
return None
return IndexDefinition(columns=column_names, unique=True)


def generate_table_definitions(dataset: DataSource) -> TableDefinition:
def get_index_column_names(dataset: DataSource, index_name: str) -> List[str]:
index_info = dataset.ExecuteSQL(
f"select name from pragma_index_info('{index_name}');"
)
column_names: List[str] = [r["name"] for r in index_info]
dataset.ReleaseResultSet(index_info)
return column_names


def get_foreign_key_definitions(dataset, table_name) -> List[ForeignKeyDefinition]:
foreign_key_list = dataset.ExecuteSQL(
f'select id, seq, "table", "from", "to" from pragma_foreign_key_list(\'{table_name}\');'
)
foreign_key_definitions: List[ForeignKeyDefinition] = []
for foreign_key_listing in group_by(foreign_key_list, lambda r: r["id"]):
table: str = ""
columns: Dict[str, str] = {}
for column_reference in foreign_key_listing:
table = column_reference["table"]
to = column_reference["to"]
if to is None:
pk_index = get_pk_index(dataset, column_reference["table"])
to = pk_index.columns[int(column_reference["seq"])]
columns[column_reference["from"]] = to
foreign_key_definitions.append(
ForeignKeyDefinition(
table=table,
columns=tuple(
ColumnMapping(src=c[0], dst=c[1]) for c in columns.items()
),
)
)
foreign_key_definitions = sorted(
foreign_key_definitions, key=lambda fk: (fk.table, (c.src for c in fk.columns))
)
dataset.ReleaseResultSet(foreign_key_list)
return foreign_key_definitions


def generate_table_definitions(
dataset: DataSource, with_indexes_and_fks: bool = False
) -> TablesDefinition:
projections = set()
table_geometry_types = {
table_name: geometry_type_name
for table_name, _, geometry_type_name in utils.dataset_geometry_tables(dataset)
}

table_list = []
table_list: List[TableDefinition] = []
for table in dataset:
table: Layer
geo_column_name = table.GetGeometryColumn()
if geo_column_name == "":
continue
Expand All @@ -58,35 +141,46 @@ def generate_table_definitions(dataset: DataSource) -> TableDefinition:
"name": geo_column_name,
"type": table_geometry_types[table_name],
}
columns = tuple(column_definitions(table, geometry_column))

indexes = None
foreign_keys = None
if with_indexes_and_fks:
indexes = tuple(get_index_definitions(dataset, table_name))
foreign_keys = tuple(get_foreign_key_definitions(dataset, table_name))

table_list.append(
OrderedDict(
[
("name", table_name),
("geometry_column", geo_column_name),
("columns", columns_definition(table, geometry_column)),
]
TableDefinition(
name=table_name,
geometry_column=geo_column_name,
columns=columns,
indexes=indexes,
foreign_keys=foreign_keys,
)
)

projections.add(table.GetSpatialRef().GetAuthorityCode(None))

assert len(projections) == 1, "Expected one projection per geopackage."

result = OrderedDict(
[
("geopackage_validator_version", __version__),
("projection", int(projections.pop())),
("tables", table_list),
]
result = TablesDefinition(
geopackage_validator_version=__version__,
projection=int(projections.pop()),
tables=tuple(sorted(table_list, key=lambda t: t.name)),
)

return result


def generate_definitions_for_path(gpkg_path: str) -> TableDefinition:
def get_datasource_for_path(gpkg_path: str, error_handler=None) -> DataSource:
"""Starts the geopackage validation."""
utils.check_gdal_version()
return utils.open_dataset(gpkg_path, error_handler)

dataset = utils.open_dataset(gpkg_path)

return generate_table_definitions(dataset)
def generate_definitions_for_path(
gpkg_path: str, with_indexes_and_fks: bool = False
) -> TablesDefinition:
return generate_table_definitions(
get_datasource_for_path(gpkg_path), with_indexes_and_fks
)
Loading

0 comments on commit 725a330

Please sign in to comment.