diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e2049f7e02..d9408c786a 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -13,7 +13,9 @@ on: env: DOCKER_USER: gadockersvc - DOCKER_IMAGE: opendatacube/datacube-tests:latest + DOCKER_IMAGE: opendatacube/datacube-tests:latest1.9 + # NB Restore standard image name when merge into develop + # DOCKER_IMAGE: opendatacube/datacube-tests:latest jobs: @@ -63,7 +65,6 @@ jobs: password: ${{ secrets.GADOCKERSVC_PASSWORD }} - name: Build Docker - if: steps.changes.outputs.docker == 'true' uses: docker/build-push-action@v4 with: file: docker/Dockerfile @@ -92,12 +93,16 @@ jobs: EOF - name: DockerHub Push - if: steps.changes.outputs.docker == 'true' + if: | + github.event_name == 'push' + && github.ref == 'refs/heads/develop' + && steps.changes.outputs.docker == 'true' uses: docker/build-push-action@v4 with: + file: docker/Dockerfile context: . push: true - tags: ${DOCKER_IMAGE} + tags: ${{env.DOCKER_IMAGE}} - name: Build Packages run: | @@ -135,7 +140,6 @@ jobs: TWINE_PASSWORD: ${{ secrets.PyPiToken }} - name: Upload coverage to Codecov - if: steps.cfg.outputs.primary == 'yes' uses: codecov/codecov-action@v3 with: file: ./coverage.xml diff --git a/conda-environment.yml b/conda-environment.yml index 960faf44b1..553037c92a 100644 --- a/conda-environment.yml +++ b/conda-environment.yml @@ -32,7 +32,7 @@ dependencies: - pyyaml - rasterio >=1.3.2 - ruamel.yaml - - sqlalchemy <2.0 + - sqlalchemy >=2.0 - GeoAlchemy2 - xarray >=0.9 - toolz diff --git a/datacube/drivers/postgis/_api.py b/datacube/drivers/postgis/_api.py index 6e7edb0571..6d25ee21a9 100644 --- a/datacube/drivers/postgis/_api.py +++ b/datacube/drivers/postgis/_api.py @@ -391,9 +391,9 @@ def spatial_extent(self, ids, crs): if SpatialIndex is None: return None result = self._connection.execute( - select([ + select( func.ST_AsGeoJSON(func.ST_Union(SpatialIndex.extent)) - ]).select_from( + ).select_from( SpatialIndex ).where( SpatialIndex.dataset_ref.in_(ids) @@ -442,7 +442,7 @@ def get_datasets_for_location(self, uri, mode=None): return self._connection.execute( select( - _dataset_select_fields() + *_dataset_select_fields() ).join( Dataset.locations ).where( @@ -619,7 +619,7 @@ def search_datasets_query(self, raw_expressions = PostgisDbAPI._alchemify_expressions(expressions) join_tables = PostgisDbAPI._join_tables(expressions, select_fields) where_expr = and_(Dataset.archived == None, *raw_expressions) - query = select(select_columns).select_from(Dataset) + query = select(*select_columns).select_from(Dataset) for joins in join_tables: query = query.join(*joins) if spatialquery is not None: @@ -739,7 +739,8 @@ def get_duplicates(self, match_fields: Sequence[PgField], expressions: Sequence[ join_tables = PostgisDbAPI._join_tables(expressions, match_fields) query = select( - (func.array_agg(Dataset.id),) + group_expressions + func.array_agg(Dataset.id), + *group_expressions ).select_from(Dataset) for joins in join_tables: query = query.join(*joins) @@ -792,24 +793,24 @@ def count_datasets_through_time(self, start, end, period, time_field, expression def count_datasets_through_time_query(self, start, end, period, time_field, expressions): raw_expressions = self._alchemify_expressions(expressions) - start_times = select(( + start_times = select( func.generate_series(start, end, cast(period, INTERVAL)).label('start_time'), - )).alias('start_times') + ).alias('start_times') time_range_select = ( - select(( + select( func.tstzrange( start_times.c.start_time, func.lead(start_times.c.start_time).over() ).label('time_period'), - )) + ) ).alias('all_time_ranges') # Exclude the trailing (end time to infinite) row. Is there a simpler way? time_ranges = ( - select(( + select( time_range_select, - )).where( + ).where( ~func.upper_inf(time_range_select.c.time_period) ) ).alias('time_ranges') @@ -826,7 +827,7 @@ def count_datasets_through_time_query(self, start, end, period, time_field, expr ) ) - return select((time_ranges.c.time_period, count_query.label('dataset_count'))) + return select(time_ranges.c.time_period, count_query.label('dataset_count')) def update_search_index(self, product_names: Sequence[str] = [], dsids: Sequence[DSID] = []): """ @@ -1287,9 +1288,9 @@ def get_all_relations(self, dsids: Iterable[uuid.UUID]) -> Iterable[LineageRelat )) ) for rel in results: - yield LineageRelation(classifier=rel["classifier"], - source_id=rel["source_dataset_ref"], - derived_id=rel["derived_dataset_ref"]) + yield LineageRelation(classifier=rel.classifier, + source_id=rel.source_dataset_ref, + derived_id=rel.derived_dataset_ref) def write_relations(self, relations: Iterable[LineageRelation], allow_updates: bool): """ @@ -1366,9 +1367,9 @@ def load_lineage_relations(self, next_lvl_ids = set() results = self._connection.execute(qry) for row in results: - rel = LineageRelation(classifier=row["classifier"], - source_id=row["source_dataset_ref"], - derived_id=row["derived_dataset_ref"]) + rel = LineageRelation(classifier=row.classifier, + source_id=row.source_dataset_ref, + derived_id=row.derived_dataset_ref) relations.append(rel) if direction == LineageDirection.SOURCES: next_id = rel.source_id diff --git a/datacube/drivers/postgis/_core.py b/datacube/drivers/postgis/_core.py index b0922bd652..7567805060 100644 --- a/datacube/drivers/postgis/_core.py +++ b/datacube/drivers/postgis/_core.py @@ -73,70 +73,65 @@ def ensure_db(engine, with_permissions=True): Create the schema if it doesn't exist. """ - is_new = False - c = engine.connect() - - quoted_db_name, quoted_user = _get_quoted_connection_info(c) - - _ensure_extension(c, 'POSTGIS') - - if with_permissions: - _LOG.info('Ensuring user roles.') - _ensure_role(c, 'odc_user') - _ensure_role(c, 'odc_ingest', inherits_from='odc_user') - _ensure_role(c, 'odc_manage', inherits_from='odc_ingest') - _ensure_role(c, 'odc_admin', inherits_from='odc_manage', add_user=True) - - c.execute(text(f""" - grant all on database {quoted_db_name} to odc_admin; - """)) - - if not has_schema(engine): - is_new = True - try: - # TODO: Switch to SQLAlchemy-2.0/Future style connections and transactions. + is_new = not has_schema(engine) + with engine.connect() as c: + # NB. Using default SQLA2.0 auto-begin commit-as-you-go behaviour + quoted_db_name, quoted_user = _get_quoted_connection_info(c) + + _ensure_extension(c, 'POSTGIS') + c.commit() + + if with_permissions: + _LOG.info('Ensuring user roles.') + _ensure_role(c, 'odc_user') + _ensure_role(c, 'odc_ingest', inherits_from='odc_user') + _ensure_role(c, 'odc_manage', inherits_from='odc_ingest') + _ensure_role(c, 'odc_admin', inherits_from='odc_manage', add_user=True) + + c.execute(text(f""" + grant all on database {quoted_db_name} to odc_admin; + """)) + c.commit() + + if is_new: sqla_txn = c.begin() if with_permissions: # Switch to 'odc_admin', so that all items are owned by them. c.execute(text('set role odc_admin')) _LOG.info('Creating schema.') c.execute(CreateSchema(SCHEMA_NAME)) - _LOG.info('Creating tables.') + _LOG.info('Creating types.') c.execute(text(TYPES_INIT_SQL)) from ._schema import orm_registry, ALL_STATIC_TABLES + _LOG.info('Creating tables.') _LOG.info("Dataset indexes: %s", repr(orm_registry.metadata.tables["odc.dataset"].indexes)) orm_registry.metadata.create_all(c, tables=ALL_STATIC_TABLES) _LOG.info("Creating triggers.") install_timestamp_trigger(c) sqla_txn.commit() - except: # noqa: E722 - _LOG.error("Unhandled SQLAlchemy error.") - sqla_txn.rollback() - raise - finally: if with_permissions: c.execute(text(f'set role {quoted_user}')) - - if with_permissions: - _LOG.info('Adding role grants.') - c.execute(text(f""" - grant usage on schema {SCHEMA_NAME} to odc_user; - grant select on all tables in schema {SCHEMA_NAME} to odc_user; - grant execute on function {SCHEMA_NAME}.common_timestamp(text) to odc_user; - - grant insert on {SCHEMA_NAME}.dataset, - {SCHEMA_NAME}.location, - {SCHEMA_NAME}.dataset_lineage to odc_ingest; - grant usage, select on all sequences in schema {SCHEMA_NAME} to odc_ingest; - - -- (We're only granting deletion of types that have nothing written yet: they can't delete the data itself) - grant insert, delete on {SCHEMA_NAME}.product, - {SCHEMA_NAME}.metadata_type to odc_manage; - -- Allow creation of indexes, views - grant create on schema {SCHEMA_NAME} to odc_manage; - """)) - - c.close() + c.commit() + + if with_permissions: + _LOG.info('Adding role grants.') + c.execute(text(f""" + grant usage on schema {SCHEMA_NAME} to odc_user; + grant select on all tables in schema {SCHEMA_NAME} to odc_user; + grant execute on function {SCHEMA_NAME}.common_timestamp(text) to odc_user; + + grant insert on {SCHEMA_NAME}.dataset, + {SCHEMA_NAME}.location, + {SCHEMA_NAME}.dataset_lineage to odc_ingest; + grant usage, select on all sequences in schema {SCHEMA_NAME} to odc_ingest; + + -- (We're only granting deletion of types that have nothing written yet: they can't delete the data itself) + grant insert, delete on {SCHEMA_NAME}.product, + {SCHEMA_NAME}.metadata_type to odc_manage; + -- Allow creation of indexes, views + grant create on schema {SCHEMA_NAME} to odc_manage; + """)) + c.commit() return is_new diff --git a/datacube/drivers/postgis/sql.py b/datacube/drivers/postgis/sql.py index 321f97b9d7..bdb4ff06de 100644 --- a/datacube/drivers/postgis/sql.py +++ b/datacube/drivers/postgis/sql.py @@ -7,7 +7,8 @@ """ from sqlalchemy import TIMESTAMP, text -from sqlalchemy.dialects.postgresql.ranges import RangeOperators +from sqlalchemy.types import Double +from sqlalchemy.dialects.postgresql.ranges import AbstractRange, Range from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql import sqltypes from sqlalchemy.sql.expression import Executable, ClauseElement @@ -74,7 +75,7 @@ def visit_create_view(element, compiler, **kw): # pylint: disable=abstract-method -class FLOAT8RANGE(RangeOperators, sqltypes.TypeEngine): +class FLOAT8RANGE(AbstractRange[Range[Double]]): __visit_name__ = 'FLOAT8RANGE' diff --git a/datacube/drivers/postgres/_api.py b/datacube/drivers/postgres/_api.py index 0ff635d7f8..36a1290df7 100644 --- a/datacube/drivers/postgres/_api.py +++ b/datacube/drivers/postgres/_api.py @@ -333,7 +333,7 @@ def get_datasets_for_location(self, uri, mode=None): return self._connection.execute( select( - _DATASET_SELECT_FIELDS + *_DATASET_SELECT_FIELDS ).select_from( DATASET_LOCATION.join(DATASET) ).where( @@ -482,7 +482,7 @@ def search_datasets_by_metadata(self, metadata): """ # Find any storage types whose 'dataset_metadata' document is a subset of the metadata. return self._connection.execute( - select(_DATASET_SELECT_FIELDS).where(DATASET.c.metadata.contains(metadata)) + select(*_DATASET_SELECT_FIELDS).where(DATASET.c.metadata.contains(metadata)) ).fetchall() def search_products_by_metadata(self, metadata): @@ -530,7 +530,7 @@ def search_datasets_query(expressions, source_exprs=None, # Include the IDs of source datasets select_columns += ( select( - (func.array_agg(DATASET_SOURCE.c.source_dataset_ref),) + func.array_agg(DATASET_SOURCE.c.source_dataset_ref) ).select_from( DATASET_SOURCE ).where( @@ -571,15 +571,18 @@ def search_datasets_query(expressions, source_exprs=None, ) ).cte(name="base_query", recursive=True) + rq_select_cols = [ + col + for col in base_query.columns + if col.name not in ['source_dataset_ref', 'distance', 'path'] + ] + [ + DATASET_SOURCE.c.source_dataset_ref, + (base_query.c.distance + 1).label('distance'), + (base_query.c.path + '.' + DATASET_SOURCE.c.classifier).label('path') + ] recursive_query = base_query.union_all( select( - [col for col in base_query.columns - if col.name not in ['source_dataset_ref', 'distance', 'path'] - ] + [ - DATASET_SOURCE.c.source_dataset_ref, - (base_query.c.distance + 1).label('distance'), - (base_query.c.path + '.' + DATASET_SOURCE.c.classifier).label('path') - ] + *rq_select_cols ).select_from( base_query.join( DATASET_SOURCE, base_query.c.source_dataset_ref == DATASET_SOURCE.c.dataset_ref @@ -589,10 +592,11 @@ def search_datasets_query(expressions, source_exprs=None, return ( select( - [distinct(recursive_query.c.id) - ] + [ + distinct(recursive_query.c.id), + *[ col for col in recursive_query.columns - if col.name not in ['id', 'source_dataset_ref', 'distance', 'path']] + if col.name not in ['id', 'source_dataset_ref', 'distance', 'path'] + ] ).select_from( recursive_query.join(DATASET, DATASET.c.id == recursive_query.c.source_dataset_ref) ).where( @@ -718,9 +722,9 @@ def search_unique_datasets_query(expressions, select_fields, limit): if field.name in {'uri', 'uris'}: # All active URIs, from newest to oldest uris_field = func.array( - select([ + select( _dataset_uri_field(SELECTED_DATASET_LOCATION) - ]).where( + ).where( and_( SELECTED_DATASET_LOCATION.c.dataset_ref == DATASET.c.id, SELECTED_DATASET_LOCATION.c.archived == None @@ -746,7 +750,7 @@ def search_unique_datasets_query(expressions, select_fields, limit): return ( select( - select_columns + *select_columns ).select_from( from_expression ).where( @@ -774,7 +778,8 @@ def get_duplicates(self, match_fields: Iterable[PgField], expressions: Iterable[ group_expressions = tuple(f.alchemy_expression for f in match_fields) select_query = select( - (func.array_agg(DATASET.c.id),) + group_expressions + func.array_agg(DATASET.c.id), + *group_expressions ).select_from( PostgresDbAPI._from_expression(DATASET, expressions, match_fields) ).where( @@ -796,7 +801,7 @@ def count_datasets(self, expressions): select_query = ( select( - [func.count('*')] + func.count('*') ).select_from( self._from_expression(DATASET, expressions) ).where( @@ -826,31 +831,31 @@ def count_datasets_through_time(self, start, end, period, time_field, expression def count_datasets_through_time_query(self, start, end, period, time_field, expressions): raw_expressions = self._alchemify_expressions(expressions) - start_times = select(( + start_times = select( func.generate_series(start, end, cast(period, INTERVAL)).label('start_time'), - )).alias('start_times') + ).alias('start_times') time_range_select = ( - select(( + select( func.tstzrange( start_times.c.start_time, func.lead(start_times.c.start_time).over() ).label('time_period'), - )) + ) ).alias('all_time_ranges') # Exclude the trailing (end time to infinite) row. Is there a simpler way? time_ranges = ( - select(( + select( time_range_select, - )).where( + ).where( ~func.upper_inf(time_range_select.c.time_period) ) ).alias('time_ranges') count_query = ( select( - (func.count('*'),) + func.count('*') ).select_from( self._from_expression(DATASET, expressions) ).where( @@ -862,7 +867,7 @@ def count_datasets_through_time_query(self, start, end, period, time_field, expr ) ) - return select((time_ranges.c.time_period, count_query.label('dataset_count'))) + return select(time_ranges.c.time_period, count_query.label('dataset_count')) @staticmethod def _from_expression(source_table, expressions=None, fields=None): @@ -1092,9 +1097,9 @@ def get_locations(self, dataset_id): return [ record[0] for record in self._connection.execute( - select([ + select( _dataset_uri_field(DATASET_LOCATION) - ]).where( + ).where( and_(DATASET_LOCATION.c.dataset_ref == dataset_id, DATASET_LOCATION.c.archived == None) ).order_by( DATASET_LOCATION.c.added.desc(), @@ -1110,9 +1115,9 @@ def get_archived_locations(self, dataset_id): return [ (location_uri, archived_time) for location_uri, archived_time in self._connection.execute( - select([ + select( _dataset_uri_field(DATASET_LOCATION), DATASET_LOCATION.c.archived - ]).where( + ).where( and_(DATASET_LOCATION.c.dataset_ref == dataset_id, DATASET_LOCATION.c.archived != None) ).order_by( DATASET_LOCATION.c.added.desc() diff --git a/datacube/drivers/postgres/_core.py b/datacube/drivers/postgres/_core.py index 52cd926966..94a5aa9c4a 100644 --- a/datacube/drivers/postgres/_core.py +++ b/datacube/drivers/postgres/_core.py @@ -81,66 +81,60 @@ def ensure_db(engine, with_permissions=True): Create the schema if it doesn't exist. """ - is_new = False - c = engine.connect() - - quoted_db_name, quoted_user = _get_quoted_connection_info(c) - - if with_permissions: - _LOG.info('Ensuring user roles.') - _ensure_role(c, 'agdc_user') - _ensure_role(c, 'agdc_ingest', inherits_from='agdc_user') - _ensure_role(c, 'agdc_manage', inherits_from='agdc_ingest') - _ensure_role(c, 'agdc_admin', inherits_from='agdc_manage', add_user=True) - - c.execute(text(""" - grant all on database {db} to agdc_admin; - """.format(db=quoted_db_name))) - - if not has_schema(engine): - is_new = True - try: - sqla_txn = c.begin() + is_new = not has_schema(engine) + with engine.connect() as c: + # NB. Using default SQLA2.0 auto-begin commit-as-you-go behaviour + quoted_db_name, quoted_user = _get_quoted_connection_info(c) + + if with_permissions: + _LOG.info('Ensuring user roles.') + _ensure_role(c, 'agdc_user') + _ensure_role(c, 'agdc_ingest', inherits_from='agdc_user') + _ensure_role(c, 'agdc_manage', inherits_from='agdc_ingest') + _ensure_role(c, 'agdc_admin', inherits_from='agdc_manage', add_user=True) + + c.execute(text(""" + grant all on database {db} to agdc_admin; + """.format(db=quoted_db_name))) + c.commit() + + if is_new: if with_permissions: # Switch to 'agdc_admin', so that all items are owned by them. c.execute(text('set role agdc_admin')) _LOG.info('Creating schema.') c.execute(CreateSchema(SCHEMA_NAME)) - _LOG.info('Creating tables.') + _LOG.info('Creating types.') c.execute(text(TYPES_INIT_SQL)) + _LOG.info('Creating tables.') METADATA.create_all(c) _LOG.info("Creating triggers.") install_timestamp_trigger(c) _LOG.info("Creating added column.") install_added_column(c) - sqla_txn.commit() - except: # noqa: E722 - sqla_txn.rollback() - raise - finally: if with_permissions: c.execute(text('set role {}'.format(quoted_user))) - - if with_permissions: - _LOG.info('Adding role grants.') - c.execute(text(""" - grant usage on schema {schema} to agdc_user; - grant select on all tables in schema {schema} to agdc_user; - grant execute on function {schema}.common_timestamp(text) to agdc_user; - - grant insert on {schema}.dataset, - {schema}.dataset_location, - {schema}.dataset_source to agdc_ingest; - grant usage, select on all sequences in schema {schema} to agdc_ingest; - - -- (We're only granting deletion of types that have nothing written yet: they can't delete the data itself) - grant insert, delete on {schema}.dataset_type, - {schema}.metadata_type to agdc_manage; - -- Allow creation of indexes, views - grant create on schema {schema} to agdc_manage; - """.format(schema=SCHEMA_NAME))) - - c.close() + c.commit() + + if with_permissions: + _LOG.info('Adding role grants.') + c.execute(text(""" + grant usage on schema {schema} to agdc_user; + grant select on all tables in schema {schema} to agdc_user; + grant execute on function {schema}.common_timestamp(text) to agdc_user; + + grant insert on {schema}.dataset, + {schema}.dataset_location, + {schema}.dataset_source to agdc_ingest; + grant usage, select on all sequences in schema {schema} to agdc_ingest; + + -- (We're only granting deletion of types that have nothing written yet: they can't delete the data itself) + grant insert, delete on {schema}.dataset_type, + {schema}.metadata_type to agdc_manage; + -- Allow creation of indexes, views + grant create on schema {schema} to agdc_manage; + """.format(schema=SCHEMA_NAME))) + c.commit() return is_new diff --git a/datacube/drivers/postgres/_fields.py b/datacube/drivers/postgres/_fields.py index 8e58bb020f..68819e7ab9 100755 --- a/datacube/drivers/postgres/_fields.py +++ b/datacube/drivers/postgres/_fields.py @@ -397,7 +397,7 @@ def between(self, low, high): """ :rtype: Expression """ - return RangeBetweenExpression(self, low, high, _range_class=NumericRange) + return RangeBetweenExpression(self, low, high, _range_class=func.agdc.float8range) class DateRangeDocField(RangeDocField): diff --git a/datacube/drivers/postgres/sql.py b/datacube/drivers/postgres/sql.py index 5801d95edb..7c421b4d1d 100644 --- a/datacube/drivers/postgres/sql.py +++ b/datacube/drivers/postgres/sql.py @@ -7,7 +7,8 @@ """ from sqlalchemy import TIMESTAMP, text -from sqlalchemy.dialects.postgresql.ranges import RangeOperators +from sqlalchemy.types import Double +from sqlalchemy.dialects.postgresql.ranges import AbstractRange, Range from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql import sqltypes from sqlalchemy.sql.expression import Executable, ClauseElement @@ -74,7 +75,7 @@ def visit_create_view(element, compiler, **kw): # pylint: disable=abstract-method -class FLOAT8RANGE(RangeOperators, sqltypes.TypeEngine): +class FLOAT8RANGE(AbstractRange[Range[Double]]): __visit_name__ = 'FLOAT8RANGE' diff --git a/datacube/index/postgis/_datasets.py b/datacube/index/postgis/_datasets.py index 8a7fdcb9e0..840272a2ee 100755 --- a/datacube/index/postgis/_datasets.py +++ b/datacube/index/postgis/_datasets.py @@ -760,7 +760,7 @@ def search_summaries(self, **query): """ for _, results in self._do_search_by_product(query, return_fields=True): for columns in results: - output = dict(columns) + output = columns._asdict() _LOG.warning("search results: %s (%s)", output["id"], output["product"]) yield output diff --git a/datacube/index/postgres/_datasets.py b/datacube/index/postgres/_datasets.py index af29da81d8..522b17c701 100755 --- a/datacube/index/postgres/_datasets.py +++ b/datacube/index/postgres/_datasets.py @@ -739,13 +739,13 @@ def search_summaries(self, **query): """ for _, results in self._do_search_by_product(query, return_fields=True): for columns in results: - yield dict(columns) + yield columns._asdict() def get_product_time_bounds(self, product: str): """ Returns the minimum and maximum acquisition time of the product. """ - + # This implementation violates architecture - should not be SQLAlchemy code at this level. # Get the offsets from dataset doc product = self.types.get_by_name(product) dataset_section = product.metadata_type.definition['dataset'] @@ -769,7 +769,7 @@ def get_product_time_bounds(self, product: str): with self._db_connection() as connection: result = connection.execute( select( - [func.min(time_min.alchemy_expression), func.max(time_max.alchemy_expression)] + func.min(time_min.alchemy_expression), func.max(time_max.alchemy_expression) ).where( DATASET.c.dataset_type_ref == product.id ) diff --git a/docker/constraints.in b/docker/constraints.in index ffb015b5c5..be24f578a2 100644 --- a/docker/constraints.in +++ b/docker/constraints.in @@ -35,7 +35,7 @@ shapely>=2.0 sphinx-click sphinx_autodoc_typehints sphinx_rtd_theme -sqlalchemy<2.0 +sqlalchemy>=2.0 toolz xarray>=0.9 diff --git a/docker/constraints.txt b/docker/constraints.txt index 35c1848e94..114dd381a9 100644 --- a/docker/constraints.txt +++ b/docker/constraints.txt @@ -380,7 +380,7 @@ sphinxcontrib-qthelp==1.0.3 # via sphinx sphinxcontrib-serializinghtml==1.1.5 # via sphinx -sqlalchemy==1.4.46 +sqlalchemy==2.0.8 # via # -r constraints.in # geoalchemy2 @@ -411,6 +411,7 @@ typing-extensions==4.4.0 # via # pygeoif # setuptools-scm + # sqlalchemy urllib3==1.26.14 # via # botocore diff --git a/docs/about/whats_new.rst b/docs/about/whats_new.rst index 1df3863343..4f3d6bc249 100644 --- a/docs/about/whats_new.rst +++ b/docs/about/whats_new.rst @@ -5,6 +5,14 @@ What's New ********** +v1.9.next +========= + +- External Lineage API (:pull:`#1401`) +- Add lineage support to index clone operation (:pull:`#1429`) +- Migrate to SQLAlchemy 2.0 (:pull:`#1432`) + + v1.8.next ========= diff --git a/integration_tests/index/test_config_docs.py b/integration_tests/index/test_config_docs.py index 0ae002e5c9..9ea1e0ea79 100644 --- a/integration_tests/index/test_config_docs.py +++ b/integration_tests/index/test_config_docs.py @@ -9,6 +9,7 @@ import copy import pytest import yaml +from sqlalchemy import text from datacube.drivers.postgres._fields import NumericRangeDocField as PgrNumericRangeDocField, PgField as PgrPgField from datacube.drivers.postgis._fields import NumericRangeDocField as PgsNumericRangeDocField, PgField as PgsPgField @@ -162,7 +163,7 @@ def _object_exists(index, index_name): else: schema_name = "agdc" with index._active_connection() as connection: - val = connection._connection.execute(f"SELECT to_regclass('{schema_name}.{index_name}')").scalar() + val = connection._connection.execute(text(f"SELECT to_regclass('{schema_name}.{index_name}')")).scalar() return val in (index_name, f'{schema_name}.{index_name}') diff --git a/integration_tests/index/test_search_legacy.py b/integration_tests/index/test_search_legacy.py index 5d9c7151e5..e88050a5db 100644 --- a/integration_tests/index/test_search_legacy.py +++ b/integration_tests/index/test_search_legacy.py @@ -15,7 +15,7 @@ import pytest import yaml from dateutil import tz -from psycopg2._range import NumericRange +from sqlalchemy.dialects.postgresql.ranges import Range as SQLARange from datacube.config import LocalConfig from datacube.drivers.postgres._connections import DEFAULT_DB_USER @@ -489,7 +489,6 @@ def test_search_returning(index: Index, pseudo_ls8_type: Product, pseudo_ls8_dataset: Dataset, ls5_dataset_w_children) -> None: - assert index.datasets.count() == 4, "Expected four test datasets" # Expect one product with our one dataset. @@ -500,10 +499,11 @@ def test_search_returning(index: Index, )) assert len(results) == 1 id_, path_range, sat_range = results[0] + path_range_type = path_range.__class__ assert id_ == pseudo_ls8_dataset.id # TODO: output nicer types? - assert path_range == NumericRange(Decimal('116'), Decimal('116'), '[]') - assert sat_range == NumericRange(Decimal('74'), Decimal('84'), '[]') + assert path_range == SQLARange(lower=Decimal('116'), upper=Decimal('116'), bounds='[]') + assert sat_range == SQLARange(lower=Decimal('74'), upper=Decimal('84'), bounds='[]') results = list(index.datasets.search_returning( ('id', 'metadata_doc',), @@ -875,8 +875,6 @@ def test_cli_missing_info(clirunner, index): def test_find_duplicates(index, pseudo_ls8_type, pseudo_ls8_dataset, pseudo_ls8_dataset2, pseudo_ls8_dataset3, pseudo_ls8_dataset4, ls5_dataset_w_children): - # type: (Index, Product, Dataset, Dataset, Dataset, Dataset, Dataset) -> None - # Our four ls8 datasets and three ls5. all_datasets = index.datasets.search_eager() assert len(all_datasets) == 7 @@ -885,15 +883,15 @@ def test_find_duplicates(index, pseudo_ls8_type, expected_ls8_path_row_duplicates = [ ( ( - NumericRange(Decimal('116'), Decimal('116'), '[]'), - NumericRange(Decimal('74'), Decimal('84'), '[]') + SQLARange(lower=Decimal('116'), upper=Decimal('116'), bounds='[]'), + SQLARange(lower=Decimal('74'), upper=Decimal('84'), bounds='[]') ), {pseudo_ls8_dataset.id, pseudo_ls8_dataset2.id} ), ( ( - NumericRange(Decimal('116'), Decimal('116'), '[]'), - NumericRange(Decimal('85'), Decimal('87'), '[]') + SQLARange(lower=Decimal('116'), upper=Decimal('116'), bounds='[]'), + SQLARange(lower=Decimal('85'), upper=Decimal('87'), bounds='[]') ), {pseudo_ls8_dataset3.id, pseudo_ls8_dataset4.id} ), @@ -902,28 +900,24 @@ def test_find_duplicates(index, pseudo_ls8_type, # Specifying groups as fields: f = pseudo_ls8_type.metadata_type.dataset_fields.get - field_res = sorted(index.datasets.search_product_duplicates( - pseudo_ls8_type, - f('sat_path'), f('sat_row') - )) - assert field_res == expected_ls8_path_row_duplicates + field_res = list( + index.datasets.search_product_duplicates( + pseudo_ls8_type, + f('sat_path'), f('sat_row') + ) + ) + assert len(field_res) == len(expected_ls8_path_row_duplicates) + for field_result in field_res: + assert field_result in expected_ls8_path_row_duplicates # Field names as strings - product_res = sorted(index.datasets.search_product_duplicates( + product_res = list(index.datasets.search_product_duplicates( pseudo_ls8_type, 'sat_path', 'sat_row' )) assert product_res == expected_ls8_path_row_duplicates # Get duplicates that start on the same day - f = pseudo_ls8_type.metadata_type.dataset_fields.get - field_res = sorted(index.datasets.search_product_duplicates( - pseudo_ls8_type, - f('time').lower.day # type: ignore - )) - - # Datasets 1 & 3 are on the 26th. - # Datasets 2 & 4 are on the 27th. - assert field_res == [ + expected_time_day_duplicates = [ ( ( datetime.datetime(2014, 7, 26, 0, 0), @@ -938,9 +932,22 @@ def test_find_duplicates(index, pseudo_ls8_type, ), ] + f = pseudo_ls8_type.metadata_type.dataset_fields.get + field_res = list( + index.datasets.search_product_duplicates( + pseudo_ls8_type, + f('time').lower.day # type: ignore + ) + ) + + # Datasets 1 & 3 are on the 26th. + # Datasets 2 & 4 are on the 27th. + assert len(field_res) == len(expected_time_day_duplicates) + for field_result in field_res: + assert field_result in expected_time_day_duplicates # No LS5 duplicates: there's only one of each - sat_res = sorted(index.datasets.search_product_duplicates( + sat_res = list(index.datasets.search_product_duplicates( ls5_dataset_w_children.product, 'sat_path', 'sat_row' )) diff --git a/integration_tests/index/test_update_columns.py b/integration_tests/index/test_update_columns.py index e64b518246..9f3662f4ae 100644 --- a/integration_tests/index/test_update_columns.py +++ b/integration_tests/index/test_update_columns.py @@ -7,6 +7,7 @@ `datacube system init` """ import pytest +from sqlalchemy import text from datacube.drivers.postgres.sql import SCHEMA_NAME from datacube.drivers.postgres import _schema @@ -31,8 +32,10 @@ def check_column(conn, table_name: str, column_name: str) -> bool: column_result = conn.execute( - COLUMN_PRESENCE.format( - schema=SCHEMA_NAME, table=table_name, column=column_name + text( + COLUMN_PRESENCE.format( + schema=SCHEMA_NAME, table=table_name, column=column_name + ) ) ).fetchone() return column_result == (True,) @@ -40,7 +43,9 @@ def check_column(conn, table_name: str, column_name: str) -> bool: def check_trigger(conn, table_name: str) -> bool: trigger_result = conn.execute( - TRIGGER_PRESENCE.format(schema=SCHEMA_NAME, table=table_name) + text( + TRIGGER_PRESENCE.format(schema=SCHEMA_NAME, table=table_name) + ) ).fetchone() if trigger_result is None: return False @@ -48,8 +53,11 @@ def check_trigger(conn, table_name: str) -> bool: def drop_column(conn, table: str, column: str): - conn.execute(DROP_COLUMN.format( - schema=SCHEMA_NAME, table=table, column=column)) + conn.execute( + text( + DROP_COLUMN.format(schema=SCHEMA_NAME, table=table, column=column) + ) + ) @pytest.mark.parametrize('datacube_env_name', ('datacube', )) diff --git a/setup.py b/setup.py index 3d53532bc3..0802a713f3 100755 --- a/setup.py +++ b/setup.py @@ -106,7 +106,7 @@ 'pyyaml', 'rasterio>=1.3.2', # Warping broken in 1.3.0 and 1.3.1 'ruamel.yaml', - 'sqlalchemy>=1.4,<2.0', # GeoAlchemy2 requires >=1.4. SqlAlchemy2 *may* work but has not been tested yet. + 'sqlalchemy>=2.0', # GeoAlchemy2 requires >=1.4. SqlAlchemy2 *may* work but has not been tested yet. 'GeoAlchemy2', 'toolz', 'xarray>=0.9', # >0.9 fixes most problems with `crs` attributes being lost