From e8ef3449701092f8cd1ef680a4b16f25ca4671ac Mon Sep 17 00:00:00 2001 From: David Bitner Date: Tue, 14 Nov 2023 08:47:56 -0600 Subject: [PATCH] Various Bug Fixes (#226) * fix bug where edtrange was set using datetime rather than end_datetime * fixes for partition generation, drop pydantic requirement to 1.7+ * make collection id not null fixes #224 * add queue options to pypgstac * fix issues to ensure creating indexes as pgstac_admin role * update changelog --- CHANGELOG.md | 17 + .../migrations/pgstac.0.8.1-unreleased.sql | 414 +++++++++++++++++- src/pgstac/migrations/pgstac.unreleased.sql | 224 +++++++--- src/pgstac/sql/000_idempotent_pre.sql | 13 +- src/pgstac/sql/002_collections.sql | 2 +- src/pgstac/sql/002a_queryables.sql | 170 ++++--- src/pgstac/sql/003b_partitions.sql | 20 +- src/pgstac/sql/998_idempotent_post.sql | 19 + src/pgstac/tests/basic/cql2_searches.sql | 1 + src/pgstac/tests/basic/cql2_searches.sql.out | 2 + src/pgstac/tests/basic/partitions.sql | 2 + src/pgstac/tests/basic/partitions.sql.out | 3 + src/pgstac/tests/pgtap.sql | 3 +- src/pgstac/tests/pgtap/001_core.sql | 10 + src/pypgstac/pyproject.toml | 3 +- src/pypgstac/python/pypgstac/db.py | 27 +- src/pypgstac/python/pypgstac/load.py | 3 + src/pypgstac/python/pypgstac/pypgstac.py | 11 +- src/pypgstac/tests/test_load.py | 12 +- 19 files changed, 801 insertions(+), 155 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 980cde71..493845f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,28 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). ## [unreleased] + +## Added - Add support functions and tests for Collection Search - Add configuration parameter for base_url to be able to generate absolute links - With this release, this is only used to create links for paging in collection_search - Adds read only mode to allow use of pgstac on read replicas - Note: Turning on romode disables any caching (particularly when context is turned on) and does not allow to store q query hash that can be used with geometry_search. +- Add option to pypgstac loader "--usequeue" that forces use of the query queue for the loading process +- Add "pypgstac runqueue" command to run any commands that are set in the query queue + + ### Fixed + - Fix bug with end_datetime constraint management leading to inability to add data outside of constraints + - Fix bugs dealing with table ownership to ensure that all pgstac tables are owned by the pgstac_admin role + - Fixes issues with errors/warnings caused when doing index maintenance + - Fixes issues with errors/warnings caused with partition management +- Make sure that pgstac_ingest role always has read/write permissions on all tables +- Remove call to create_table_constraints from check_partition function. create_table_constraints was being called twice as it also gets called from update_partition_stats +- Add NOT NULL constraint to collections table (FIXES #224) +- Fix issue with indexes not getting created as the pg_admin role using SECURITY DEFINER + + ### Changed + - Revert pydantic requirement back to '>=1.7' and use basesettings conditionally from pydantic or pydantic.v1 to allow compatibility with pydantic 2 as well as with stac-fastapi that requires pydantic <2 ## [v0.8.1] diff --git a/src/pgstac/migrations/pgstac.0.8.1-unreleased.sql b/src/pgstac/migrations/pgstac.0.8.1-unreleased.sql index 0fff6b95..7d38f972 100644 --- a/src/pgstac/migrations/pgstac.0.8.1-unreleased.sql +++ b/src/pgstac/migrations/pgstac.0.8.1-unreleased.sql @@ -114,9 +114,20 @@ GRANT ALL ON SCHEMA pgstac TO pgstac_ingest; ALTER DEFAULT PRIVILEGES IN SCHEMA pgstac GRANT ALL ON TABLES TO pgstac_ingest; ALTER DEFAULT PRIVILEGES IN SCHEMA pgstac GRANT ALL ON FUNCTIONS TO pgstac_ingest; -SET ROLE pgstac_admin; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT SELECT ON TABLES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT USAGE ON TYPES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT ALL ON SEQUENCES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT ALL ON TABLES TO pgstac_ingest; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT ALL ON FUNCTIONS TO pgstac_ingest; + +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT SELECT ON TABLES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT USAGE ON TYPES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT ALL ON SEQUENCES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT ALL ON TABLES TO pgstac_ingest; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT ALL ON FUNCTIONS TO pgstac_ingest; SET SEARCH_PATH TO pgstac, public; +SET ROLE pgstac_admin; DO $$ BEGIN @@ -176,6 +187,8 @@ RETURNS timestamptz AS $$ ; $$ LANGUAGE SQL IMMUTABLE STRICT; -- BEGIN migra calculated SQL +alter table "pgstac"."collections" alter column "id" set not null; + set check_function_bodies = off; CREATE OR REPLACE FUNCTION pgstac.additional_properties() @@ -346,6 +359,88 @@ create or replace view "pgstac"."collections_asitems" as SELECT collections.id, FROM collections; +CREATE OR REPLACE FUNCTION pgstac.maintain_index(indexname text, queryable_idx text, dropindexes boolean DEFAULT false, rebuildindexes boolean DEFAULT false, idxconcurrently boolean DEFAULT false) + RETURNS void + LANGUAGE plpgsql + SECURITY DEFINER +AS $function$ +DECLARE +BEGIN + IF indexname IS NOT NULL THEN + IF dropindexes OR queryable_idx IS NOT NULL THEN + EXECUTE format('DROP INDEX IF EXISTS %I;', indexname); + ELSIF rebuildindexes THEN + IF idxconcurrently THEN + EXECUTE format('REINDEX INDEX CONCURRENTLY %I;', indexname); + ELSE + EXECUTE format('REINDEX INDEX CONCURRENTLY %I;', indexname); + END IF; + END IF; + END IF; + IF queryable_idx IS NOT NULL THEN + IF idxconcurrently THEN + EXECUTE replace(queryable_idx, 'INDEX', 'INDEX CONCURRENTLY'); + ELSE EXECUTE queryable_idx; + END IF; + END IF; +END; +$function$ +; + +CREATE OR REPLACE FUNCTION pgstac.queryable_indexes(treeroot text DEFAULT 'items'::text, changes boolean DEFAULT false, OUT collection text, OUT partition text, OUT field text, OUT indexname text, OUT existing_idx text, OUT queryable_idx text) + RETURNS SETOF record + LANGUAGE sql +AS $function$ +WITH p AS ( + SELECT + relid::text as partition, + replace(replace( + CASE + WHEN parentrelid::regclass::text='items' THEN pg_get_expr(c.relpartbound, c.oid) + ELSE pg_get_expr(parent.relpartbound, parent.oid) + END, + 'FOR VALUES IN (''',''), ''')', + '' + ) AS collection + FROM pg_partition_tree(treeroot) + JOIN pg_class c ON (relid::regclass = c.oid) + JOIN pg_class parent ON (parentrelid::regclass = parent.oid AND isleaf) + ), i AS ( + SELECT + partition, + indexname, + regexp_replace(btrim(replace(replace(indexdef, indexname, ''),'pgstac.',''),' \t\n'), '[ ]+', ' ', 'g') as iidx, + COALESCE( + (regexp_match(indexdef, '\(([a-zA-Z]+)\)'))[1], + (regexp_match(indexdef, '\(content -> ''properties''::text\) -> ''([a-zA-Z0-9\:\_-]+)''::text'))[1], + CASE WHEN indexdef ~* '\(datetime desc, end_datetime\)' THEN 'datetime' ELSE NULL END + ) AS field + FROM + pg_indexes + JOIN p ON (tablename=partition) + ), q AS ( + SELECT + name AS field, + collection, + partition, + format(indexdef(queryables), partition) as qidx + FROM queryables, unnest_collection(queryables.collection_ids) collection + JOIN p USING (collection) + WHERE property_index_type IS NOT NULL OR name IN ('datetime','geometry','id') + ) + SELECT + collection, + partition, + field, + indexname, + iidx as existing_idx, + qidx as queryable_idx + FROM i FULL JOIN q USING (field, partition) + WHERE CASE WHEN changes THEN lower(iidx) IS DISTINCT FROM lower(qidx) ELSE TRUE END; +; +$function$ +; + CREATE OR REPLACE FUNCTION pgstac.readonly(conf jsonb DEFAULT NULL::jsonb) RETURNS boolean LANGUAGE sql @@ -354,6 +449,142 @@ AS $function$ $function$ ; +CREATE OR REPLACE FUNCTION pgstac.check_partition(_collection text, _dtrange tstzrange, _edtrange tstzrange) + RETURNS text + LANGUAGE plpgsql + SECURITY DEFINER +AS $function$ +DECLARE + c RECORD; + pm RECORD; + _partition_name text; + _partition_dtrange tstzrange; + _constraint_dtrange tstzrange; + _constraint_edtrange tstzrange; + q text; + deferrable_q text; + err_context text; +BEGIN + SELECT * INTO c FROM pgstac.collections WHERE id=_collection; + IF NOT FOUND THEN + RAISE EXCEPTION 'Collection % does not exist', _collection USING ERRCODE = 'foreign_key_violation', HINT = 'Make sure collection exists before adding items'; + END IF; + + IF c.partition_trunc IS NOT NULL THEN + _partition_dtrange := tstzrange( + date_trunc(c.partition_trunc, lower(_dtrange)), + date_trunc(c.partition_trunc, lower(_dtrange)) + (concat('1 ', c.partition_trunc))::interval, + '[)' + ); + ELSE + _partition_dtrange := '[-infinity, infinity]'::tstzrange; + END IF; + + IF NOT _partition_dtrange @> _dtrange THEN + RAISE EXCEPTION 'dtrange % is greater than the partition size % for collection %', _dtrange, c.partition_trunc, _collection; + END IF; + + + IF c.partition_trunc = 'year' THEN + _partition_name := format('_items_%s_%s', c.key, to_char(lower(_partition_dtrange),'YYYY')); + ELSIF c.partition_trunc = 'month' THEN + _partition_name := format('_items_%s_%s', c.key, to_char(lower(_partition_dtrange),'YYYYMM')); + ELSE + _partition_name := format('_items_%s', c.key); + END IF; + + SELECT * INTO pm FROM partition_sys_meta WHERE collection=_collection AND partition_dtrange @> _dtrange; + IF FOUND THEN + RAISE NOTICE '% % %', _edtrange, _dtrange, pm; + _constraint_edtrange := + tstzrange( + least( + lower(_edtrange), + nullif(lower(pm.constraint_edtrange), '-infinity') + ), + greatest( + upper(_edtrange), + nullif(upper(pm.constraint_edtrange), 'infinity') + ), + '[]' + ); + _constraint_dtrange := + tstzrange( + least( + lower(_dtrange), + nullif(lower(pm.constraint_dtrange), '-infinity') + ), + greatest( + upper(_dtrange), + nullif(upper(pm.constraint_dtrange), 'infinity') + ), + '[]' + ); + + IF pm.constraint_edtrange @> _edtrange AND pm.constraint_dtrange @> _dtrange THEN + RETURN pm.partition; + ELSE + PERFORM drop_table_constraints(_partition_name); + END IF; + ELSE + _constraint_edtrange := _edtrange; + _constraint_dtrange := _dtrange; + END IF; + RAISE NOTICE 'EXISTING CONSTRAINTS % %, NEW % %', pm.constraint_dtrange, pm.constraint_edtrange, _constraint_dtrange, _constraint_edtrange; + RAISE NOTICE 'Creating partition % %', _partition_name, _partition_dtrange; + IF c.partition_trunc IS NULL THEN + q := format( + $q$ + CREATE TABLE IF NOT EXISTS %I partition OF items FOR VALUES IN (%L); + CREATE UNIQUE INDEX IF NOT EXISTS %I ON %I (id); + GRANT ALL ON %I to pgstac_ingest; + $q$, + _partition_name, + _collection, + concat(_partition_name,'_pk'), + _partition_name, + _partition_name + ); + ELSE + q := format( + $q$ + CREATE TABLE IF NOT EXISTS %I partition OF items FOR VALUES IN (%L) PARTITION BY RANGE (datetime); + CREATE TABLE IF NOT EXISTS %I partition OF %I FOR VALUES FROM (%L) TO (%L); + CREATE UNIQUE INDEX IF NOT EXISTS %I ON %I (id); + GRANT ALL ON %I TO pgstac_ingest; + $q$, + format('_items_%s', c.key), + _collection, + _partition_name, + format('_items_%s', c.key), + lower(_partition_dtrange), + upper(_partition_dtrange), + format('%s_pk', _partition_name), + _partition_name, + _partition_name + ); + END IF; + + BEGIN + EXECUTE q; + EXCEPTION + WHEN duplicate_table THEN + RAISE NOTICE 'Partition % already exists.', _partition_name; + WHEN others THEN + GET STACKED DIAGNOSTICS err_context = PG_EXCEPTION_CONTEXT; + RAISE INFO 'Error Name:%',SQLERRM; + RAISE INFO 'Error State:%', SQLSTATE; + RAISE INFO 'Error Context:%', err_context; + END; + PERFORM maintain_partitions(_partition_name); + PERFORM update_partition_stats_q(_partition_name, true); + REFRESH MATERIALIZED VIEW partitions; + REFRESH MATERIALIZED VIEW partition_steps; + RETURN _partition_name; +END; +$function$ +; + CREATE OR REPLACE FUNCTION pgstac.cql2_query(j jsonb, wrapper text DEFAULT NULL::text) RETURNS text LANGUAGE plpgsql @@ -717,6 +948,90 @@ END; $function$ ; +CREATE OR REPLACE FUNCTION pgstac.maintain_partition_queries(part text DEFAULT 'items'::text, dropindexes boolean DEFAULT false, rebuildindexes boolean DEFAULT false, idxconcurrently boolean DEFAULT false) + RETURNS SETOF text + LANGUAGE plpgsql +AS $function$ +DECLARE + rec record; + q text; +BEGIN + FOR rec IN ( + SELECT * FROM queryable_indexes(part,true) + ) LOOP + q := format( + 'SELECT maintain_index( + %L,%L,%L,%L,%L + );', + rec.indexname, + rec.queryable_idx, + dropindexes, + rebuildindexes, + idxconcurrently + ); + RAISE NOTICE 'Q: %s', q; + RETURN NEXT q; + END LOOP; + RETURN; +END; +$function$ +; + +CREATE OR REPLACE FUNCTION pgstac.repartition(_collection text, _partition_trunc text, triggered boolean DEFAULT false) + RETURNS text + LANGUAGE plpgsql + SECURITY DEFINER +AS $function$ +DECLARE + c RECORD; + q text; + from_trunc text; +BEGIN + SELECT * INTO c FROM pgstac.collections WHERE id=_collection; + IF NOT FOUND THEN + RAISE EXCEPTION 'Collection % does not exist', _collection USING ERRCODE = 'foreign_key_violation', HINT = 'Make sure collection exists before adding items'; + END IF; + IF triggered THEN + RAISE NOTICE 'Converting % to % partitioning via Trigger', _collection, _partition_trunc; + ELSE + RAISE NOTICE 'Converting % from using % to % partitioning', _collection, c.partition_trunc, _partition_trunc; + IF c.partition_trunc IS NOT DISTINCT FROM _partition_trunc THEN + RAISE NOTICE 'Collection % already set to use partition by %', _collection, _partition_trunc; + RETURN _collection; + END IF; + END IF; + + IF EXISTS (SELECT 1 FROM partitions_view WHERE collection=_collection LIMIT 1) THEN + EXECUTE format( + $q$ + CREATE TEMP TABLE changepartitionstaging ON COMMIT DROP AS SELECT * FROM %I; + DROP TABLE IF EXISTS %I CASCADE; + WITH p AS ( + SELECT + collection, + CASE + WHEN %L IS NULL THEN '-infinity'::timestamptz + ELSE date_trunc(%L, datetime) + END as d, + tstzrange(min(datetime),max(datetime),'[]') as dtrange, + tstzrange(min(end_datetime),max(end_datetime),'[]') as edtrange + FROM changepartitionstaging + GROUP BY 1,2 + ) SELECT check_partition(collection, dtrange, edtrange) FROM p; + INSERT INTO items SELECT * FROM changepartitionstaging; + DROP TABLE changepartitionstaging; + $q$, + concat('_items_', c.key), + concat('_items_', c.key), + c.partition_trunc, + c.partition_trunc + ); + END IF; + RETURN _collection; +END; +$function$ +; + CREATE OR REPLACE FUNCTION pgstac.search_query(_search jsonb DEFAULT '{}'::jsonb, updatestats boolean DEFAULT false, _metadata jsonb DEFAULT '{}'::jsonb) RETURNS searches LANGUAGE plpgsql @@ -787,6 +1102,84 @@ END; $function$ ; +CREATE OR REPLACE FUNCTION pgstac.update_partition_stats(_partition text, istrigger boolean DEFAULT false) + RETURNS void + LANGUAGE plpgsql + STRICT SECURITY DEFINER +AS $function$ +DECLARE + dtrange tstzrange; + edtrange tstzrange; + cdtrange tstzrange; + cedtrange tstzrange; + extent geometry; + collection text; +BEGIN + RAISE NOTICE 'Updating stats for %.', _partition; + EXECUTE format( + $q$ + SELECT + tstzrange(min(datetime), max(datetime),'[]'), + tstzrange(min(end_datetime), max(end_datetime), '[]') + FROM %I + $q$, + _partition + ) INTO dtrange, edtrange; + extent := st_estimatedextent('pgstac', _partition, 'geometry'); + INSERT INTO partition_stats (partition, dtrange, edtrange, spatial, last_updated) + SELECT _partition, dtrange, edtrange, extent, now() + ON CONFLICT (partition) DO + UPDATE SET + dtrange=EXCLUDED.dtrange, + edtrange=EXCLUDED.edtrange, + spatial=EXCLUDED.spatial, + last_updated=EXCLUDED.last_updated + ; + + SELECT + constraint_dtrange, constraint_edtrange, pv.collection + INTO cdtrange, cedtrange, collection + FROM partitions_view pv WHERE partition = _partition; + REFRESH MATERIALIZED VIEW partitions; + REFRESH MATERIALIZED VIEW partition_steps; + + + RAISE NOTICE 'Checking if we need to modify constraints...'; + RAISE NOTICE 'cdtrange: % dtrange: % cedtrange: % edtrange: %',cdtrange, dtrange, cedtrange, edtrange; + IF + (cdtrange IS DISTINCT FROM dtrange OR edtrange IS DISTINCT FROM cedtrange) + AND NOT istrigger + THEN + RAISE NOTICE 'Modifying Constraints'; + RAISE NOTICE 'Existing % %', cdtrange, cedtrange; + RAISE NOTICE 'New % %', dtrange, edtrange; + PERFORM drop_table_constraints(_partition); + PERFORM create_table_constraints(_partition, dtrange, edtrange); + REFRESH MATERIALIZED VIEW partitions; + REFRESH MATERIALIZED VIEW partition_steps; + END IF; + RAISE NOTICE 'Checking if we need to update collection extents.'; + IF get_setting_bool('update_collection_extent') THEN + RAISE NOTICE 'updating collection extent for %', collection; + PERFORM run_or_queue(format($q$ + UPDATE collections + SET content = jsonb_set_lax( + content, + '{extent}'::text[], + collection_extent(%L, FALSE), + true, + 'use_json_null' + ) WHERE id=%L + ; + $q$, collection, collection)); + ELSE + RAISE NOTICE 'Not updating collection extent for %', collection; + END IF; + +END; +$function$ +; + CREATE OR REPLACE FUNCTION pgstac.where_stats(inwhere text, updatestats boolean DEFAULT false, conf jsonb DEFAULT NULL::jsonb) RETURNS search_wheres LANGUAGE plpgsql @@ -964,6 +1357,16 @@ ALTER FUNCTION to_int COST 5000; ALTER FUNCTION to_tstz COST 5000; ALTER FUNCTION to_text_array COST 5000; +ALTER FUNCTION update_partition_stats SECURITY DEFINER; +ALTER FUNCTION partition_after_triggerfunc SECURITY DEFINER; +ALTER FUNCTION drop_table_constraints SECURITY DEFINER; +ALTER FUNCTION create_table_constraints SECURITY DEFINER; +ALTER FUNCTION check_partition SECURITY DEFINER; +ALTER FUNCTION repartition SECURITY DEFINER; +ALTER FUNCTION where_stats SECURITY DEFINER; +ALTER FUNCTION search_query SECURITY DEFINER; +ALTER FUNCTION format_item SECURITY DEFINER; +ALTER FUNCTION maintain_index SECURITY DEFINER; GRANT USAGE ON SCHEMA pgstac to pgstac_read; GRANT ALL ON SCHEMA pgstac to pgstac_ingest; @@ -981,5 +1384,14 @@ GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA pgstac to pgstac_ingest; GRANT ALL ON ALL TABLES IN SCHEMA pgstac to pgstac_ingest; GRANT USAGE ON ALL SEQUENCES IN SCHEMA pgstac to pgstac_ingest; +REVOKE ALL PRIVILEGES ON PROCEDURE run_queued_queries FROM public; +GRANT ALL ON PROCEDURE run_queued_queries TO pgstac_admin; + +REVOKE ALL PRIVILEGES ON FUNCTION run_queued_queries_intransaction FROM public; +GRANT ALL ON FUNCTION run_queued_queries_intransaction TO pgstac_admin; + +RESET ROLE; + +SET ROLE pgstac_ingest; SELECT update_partition_stats_q(partition) FROM partitions_view; SELECT set_version('unreleased'); diff --git a/src/pgstac/migrations/pgstac.unreleased.sql b/src/pgstac/migrations/pgstac.unreleased.sql index 37e2c2aa..65d527c2 100644 --- a/src/pgstac/migrations/pgstac.unreleased.sql +++ b/src/pgstac/migrations/pgstac.unreleased.sql @@ -112,9 +112,20 @@ GRANT ALL ON SCHEMA pgstac TO pgstac_ingest; ALTER DEFAULT PRIVILEGES IN SCHEMA pgstac GRANT ALL ON TABLES TO pgstac_ingest; ALTER DEFAULT PRIVILEGES IN SCHEMA pgstac GRANT ALL ON FUNCTIONS TO pgstac_ingest; -SET ROLE pgstac_admin; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT SELECT ON TABLES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT USAGE ON TYPES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT ALL ON SEQUENCES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT ALL ON TABLES TO pgstac_ingest; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT ALL ON FUNCTIONS TO pgstac_ingest; + +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT SELECT ON TABLES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT USAGE ON TYPES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT ALL ON SEQUENCES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT ALL ON TABLES TO pgstac_ingest; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT ALL ON FUNCTIONS TO pgstac_ingest; SET SEARCH_PATH TO pgstac, public; +SET ROLE pgstac_admin; DO $$ BEGIN @@ -845,7 +856,7 @@ $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; CREATE TABLE IF NOT EXISTS collections ( key bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, - id text GENERATED ALWAYS AS (content->>'id') STORED UNIQUE, + id text GENERATED ALWAYS AS (content->>'id') STORED UNIQUE NOT NULL, content JSONB NOT NULL, base_item jsonb GENERATED ALWAYS AS (pgstac.collection_base_item(content)) STORED, geometry geometry GENERATED ALWAYS AS (pgstac.collection_geom(content)) STORED, @@ -1115,6 +1126,7 @@ BEGIN END; $$ LANGUAGE PLPGSQL IMMUTABLE STRICT PARALLEL SAFE; + CREATE OR REPLACE FUNCTION indexdef(q queryables) RETURNS text AS $$ DECLARE out text; @@ -1136,7 +1148,6 @@ CREATE OR REPLACE FUNCTION indexdef(q queryables) RETURNS text AS $$ END; $$ LANGUAGE PLPGSQL IMMUTABLE; - DROP VIEW IF EXISTS pgstac_indexes; CREATE VIEW pgstac_indexes AS SELECT @@ -1179,6 +1190,96 @@ FROM LEFT JOIN pg_stats s ON (s.tablename = i.indexname) WHERE i.schemaname='pgstac' and i.tablename ~ '_items_'; +CREATE OR REPLACE FUNCTION queryable_indexes( + IN treeroot text DEFAULT 'items', + IN changes boolean DEFAULT FALSE, + OUT collection text, + OUT partition text, + OUT field text, + OUT indexname text, + OUT existing_idx text, + OUT queryable_idx text +) RETURNS SETOF RECORD AS $$ +WITH p AS ( + SELECT + relid::text as partition, + replace(replace( + CASE + WHEN parentrelid::regclass::text='items' THEN pg_get_expr(c.relpartbound, c.oid) + ELSE pg_get_expr(parent.relpartbound, parent.oid) + END, + 'FOR VALUES IN (''',''), ''')', + '' + ) AS collection + FROM pg_partition_tree(treeroot) + JOIN pg_class c ON (relid::regclass = c.oid) + JOIN pg_class parent ON (parentrelid::regclass = parent.oid AND isleaf) + ), i AS ( + SELECT + partition, + indexname, + regexp_replace(btrim(replace(replace(indexdef, indexname, ''),'pgstac.',''),' \t\n'), '[ ]+', ' ', 'g') as iidx, + COALESCE( + (regexp_match(indexdef, '\(([a-zA-Z]+)\)'))[1], + (regexp_match(indexdef, '\(content -> ''properties''::text\) -> ''([a-zA-Z0-9\:\_-]+)''::text'))[1], + CASE WHEN indexdef ~* '\(datetime desc, end_datetime\)' THEN 'datetime' ELSE NULL END + ) AS field + FROM + pg_indexes + JOIN p ON (tablename=partition) + ), q AS ( + SELECT + name AS field, + collection, + partition, + format(indexdef(queryables), partition) as qidx + FROM queryables, unnest_collection(queryables.collection_ids) collection + JOIN p USING (collection) + WHERE property_index_type IS NOT NULL OR name IN ('datetime','geometry','id') + ) + SELECT + collection, + partition, + field, + indexname, + iidx as existing_idx, + qidx as queryable_idx + FROM i FULL JOIN q USING (field, partition) + WHERE CASE WHEN changes THEN lower(iidx) IS DISTINCT FROM lower(qidx) ELSE TRUE END; +; +$$ LANGUAGE SQL; + +CREATE OR REPLACE FUNCTION maintain_index( + indexname text, + queryable_idx text, + dropindexes boolean DEFAULT FALSE, + rebuildindexes boolean DEFAULT FALSE, + idxconcurrently boolean DEFAULT FALSE +) RETURNS VOID AS $$ +DECLARE +BEGIN + IF indexname IS NOT NULL THEN + IF dropindexes OR queryable_idx IS NOT NULL THEN + EXECUTE format('DROP INDEX IF EXISTS %I;', indexname); + ELSIF rebuildindexes THEN + IF idxconcurrently THEN + EXECUTE format('REINDEX INDEX CONCURRENTLY %I;', indexname); + ELSE + EXECUTE format('REINDEX INDEX CONCURRENTLY %I;', indexname); + END IF; + END IF; + END IF; + IF queryable_idx IS NOT NULL THEN + IF idxconcurrently THEN + EXECUTE replace(queryable_idx, 'INDEX', 'INDEX CONCURRENTLY'); + ELSE EXECUTE queryable_idx; + END IF; + END IF; +END; +$$ LANGUAGE PLPGSQL SECURITY DEFINER; + + + set check_function_bodies to off; CREATE OR REPLACE FUNCTION maintain_partition_queries( part text DEFAULT 'items', @@ -1188,73 +1289,23 @@ CREATE OR REPLACE FUNCTION maintain_partition_queries( ) RETURNS SETOF text AS $$ DECLARE rec record; + q text; BEGIN FOR rec IN ( - WITH p AS ( - SELECT - relid::text as partition, - replace(replace( - CASE - WHEN level = 1 THEN pg_get_expr(c.relpartbound, c.oid) - ELSE pg_get_expr(parent.relpartbound, parent.oid) - END, - 'FOR VALUES IN (''',''), ''')', - '' - ) AS collection - FROM pg_partition_tree('items') - JOIN pg_class c ON (relid::regclass = c.oid) - JOIN pg_class parent ON (parentrelid::regclass = parent.oid AND isleaf) - ), i AS ( - SELECT - partition, - indexname, - regexp_replace(btrim(replace(replace(indexdef, indexname, ''),'pgstac.',''),' \t\n'), '[ ]+', ' ', 'g') as iidx, - COALESCE( - (regexp_match(indexdef, '\(([a-zA-Z]+)\)'))[1], - (regexp_match(indexdef, '\(content -> ''properties''::text\) -> ''([a-zA-Z0-9\:\_-]+)''::text'))[1], - CASE WHEN indexdef ~* '\(datetime desc, end_datetime\)' THEN 'datetime' ELSE NULL END - ) AS field - FROM - pg_indexes - JOIN p ON (tablename=partition) - ), q AS ( - SELECT - name AS field, - collection, - partition, - format(indexdef(queryables), partition) as qidx - FROM queryables, unnest_collection(queryables.collection_ids) collection - JOIN p USING (collection) - WHERE property_index_type IS NOT NULL OR name IN ('datetime','geometry','id') - ) - SELECT * FROM i FULL JOIN q USING (field, partition) - WHERE lower(iidx) IS DISTINCT FROM lower(qidx) + SELECT * FROM queryable_indexes(part,true) ) LOOP - IF rec.iidx IS NULL THEN - IF idxconcurrently THEN - RETURN NEXT replace(rec.qidx, 'INDEX', 'INDEX CONCURRENTLY'); - ELSE - RETURN NEXT rec.qidx; - END IF; - ELSIF rec.qidx IS NULL AND dropindexes THEN - RETURN NEXT format('DROP INDEX IF EXISTS %I;', rec.indexname); - ELSIF lower(rec.qidx) != lower(rec.iidx) THEN - IF dropindexes THEN - RETURN NEXT format('DROP INDEX IF EXISTS %I; %s;', rec.indexname, rec.qidx); - ELSE - IF idxconcurrently THEN - RETURN NEXT replace(rec.qidx, 'INDEX', 'INDEX CONCURRENTLY'); - ELSE - RETURN NEXT rec.qidx; - END IF; - END IF; - ELSIF rebuildindexes and rec.indexname IS NOT NULL THEN - IF idxconcurrently THEN - RETURN NEXT format('REINDEX INDEX CONCURRENTLY %I;', rec.indexname); - ELSE - RETURN NEXT format('REINDEX INDEX %I;', rec.indexname); - END IF; - END IF; + q := format( + 'SELECT maintain_index( + %L,%L,%L,%L,%L + );', + rec.indexname, + rec.queryable_idx, + dropindexes, + rebuildindexes, + idxconcurrently + ); + RAISE NOTICE 'Q: %s', q; + RETURN NEXT q; END LOOP; RETURN; END; @@ -2468,7 +2519,8 @@ BEGIN REFRESH MATERIALIZED VIEW partition_steps; - RAISE NOTICE 'Checking if we need to modify constraints.'; + RAISE NOTICE 'Checking if we need to modify constraints...'; + RAISE NOTICE 'cdtrange: % dtrange: % cedtrange: % edtrange: %',cdtrange, dtrange, cedtrange, edtrange; IF (cdtrange IS DISTINCT FROM dtrange OR edtrange IS DISTINCT FROM cedtrange) AND NOT istrigger @@ -2478,6 +2530,8 @@ BEGIN RAISE NOTICE 'New % %', dtrange, edtrange; PERFORM drop_table_constraints(_partition); PERFORM create_table_constraints(_partition, dtrange, edtrange); + REFRESH MATERIALIZED VIEW partitions; + REFRESH MATERIALIZED VIEW partition_steps; END IF; RAISE NOTICE 'Checking if we need to update collection extents.'; IF get_setting_bool('update_collection_extent') THEN @@ -2498,7 +2552,7 @@ BEGIN END IF; END; -$$ LANGUAGE PLPGSQL STRICT; +$$ LANGUAGE PLPGSQL STRICT SECURITY DEFINER; CREATE OR REPLACE FUNCTION partition_name( IN collection text, IN dt timestamptz, OUT partition_name text, OUT partition_range tstzrange) AS $$ @@ -2719,16 +2773,19 @@ BEGIN _constraint_edtrange := _edtrange; _constraint_dtrange := _dtrange; END IF; + RAISE NOTICE 'EXISTING CONSTRAINTS % %, NEW % %', pm.constraint_dtrange, pm.constraint_edtrange, _constraint_dtrange, _constraint_edtrange; RAISE NOTICE 'Creating partition % %', _partition_name, _partition_dtrange; IF c.partition_trunc IS NULL THEN q := format( $q$ CREATE TABLE IF NOT EXISTS %I partition OF items FOR VALUES IN (%L); CREATE UNIQUE INDEX IF NOT EXISTS %I ON %I (id); + GRANT ALL ON %I to pgstac_ingest; $q$, _partition_name, _collection, concat(_partition_name,'_pk'), + _partition_name, _partition_name ); ELSE @@ -2737,6 +2794,7 @@ BEGIN CREATE TABLE IF NOT EXISTS %I partition OF items FOR VALUES IN (%L) PARTITION BY RANGE (datetime); CREATE TABLE IF NOT EXISTS %I partition OF %I FOR VALUES FROM (%L) TO (%L); CREATE UNIQUE INDEX IF NOT EXISTS %I ON %I (id); + GRANT ALL ON %I TO pgstac_ingest; $q$, format('_items_%s', c.key), _collection, @@ -2745,6 +2803,7 @@ BEGIN lower(_partition_dtrange), upper(_partition_dtrange), format('%s_pk', _partition_name), + _partition_name, _partition_name ); END IF; @@ -2760,7 +2819,6 @@ BEGIN RAISE INFO 'Error State:%', SQLSTATE; RAISE INFO 'Error Context:%', err_context; END; - PERFORM create_table_constraints(_partition_name, _constraint_dtrange, _constraint_edtrange); PERFORM maintain_partitions(_partition_name); PERFORM update_partition_stats_q(_partition_name, true); REFRESH MATERIALIZED VIEW partitions; @@ -2798,11 +2856,12 @@ BEGIN WITH p AS ( SELECT collection, - CASE WHEN %L IS NULL THEN '-infinity'::timestamptz - ELSE date_trunc(%L, datetime) + CASE + WHEN %L IS NULL THEN '-infinity'::timestamptz + ELSE date_trunc(%L, datetime) END as d, tstzrange(min(datetime),max(datetime),'[]') as dtrange, - tstzrange(min(datetime),max(datetime),'[]') as edtrange + tstzrange(min(end_datetime),max(end_datetime),'[]') as edtrange FROM changepartitionstaging GROUP BY 1,2 ) SELECT check_partition(collection, dtrange, edtrange) FROM p; @@ -4284,6 +4343,16 @@ ALTER FUNCTION to_int COST 5000; ALTER FUNCTION to_tstz COST 5000; ALTER FUNCTION to_text_array COST 5000; +ALTER FUNCTION update_partition_stats SECURITY DEFINER; +ALTER FUNCTION partition_after_triggerfunc SECURITY DEFINER; +ALTER FUNCTION drop_table_constraints SECURITY DEFINER; +ALTER FUNCTION create_table_constraints SECURITY DEFINER; +ALTER FUNCTION check_partition SECURITY DEFINER; +ALTER FUNCTION repartition SECURITY DEFINER; +ALTER FUNCTION where_stats SECURITY DEFINER; +ALTER FUNCTION search_query SECURITY DEFINER; +ALTER FUNCTION format_item SECURITY DEFINER; +ALTER FUNCTION maintain_index SECURITY DEFINER; GRANT USAGE ON SCHEMA pgstac to pgstac_read; GRANT ALL ON SCHEMA pgstac to pgstac_ingest; @@ -4301,5 +4370,14 @@ GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA pgstac to pgstac_ingest; GRANT ALL ON ALL TABLES IN SCHEMA pgstac to pgstac_ingest; GRANT USAGE ON ALL SEQUENCES IN SCHEMA pgstac to pgstac_ingest; +REVOKE ALL PRIVILEGES ON PROCEDURE run_queued_queries FROM public; +GRANT ALL ON PROCEDURE run_queued_queries TO pgstac_admin; + +REVOKE ALL PRIVILEGES ON FUNCTION run_queued_queries_intransaction FROM public; +GRANT ALL ON FUNCTION run_queued_queries_intransaction TO pgstac_admin; + +RESET ROLE; + +SET ROLE pgstac_ingest; SELECT update_partition_stats_q(partition) FROM partitions_view; SELECT set_version('unreleased'); diff --git a/src/pgstac/sql/000_idempotent_pre.sql b/src/pgstac/sql/000_idempotent_pre.sql index 6c0df029..6b00e2b5 100644 --- a/src/pgstac/sql/000_idempotent_pre.sql +++ b/src/pgstac/sql/000_idempotent_pre.sql @@ -112,9 +112,20 @@ GRANT ALL ON SCHEMA pgstac TO pgstac_ingest; ALTER DEFAULT PRIVILEGES IN SCHEMA pgstac GRANT ALL ON TABLES TO pgstac_ingest; ALTER DEFAULT PRIVILEGES IN SCHEMA pgstac GRANT ALL ON FUNCTIONS TO pgstac_ingest; -SET ROLE pgstac_admin; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT SELECT ON TABLES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT USAGE ON TYPES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT ALL ON SEQUENCES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT ALL ON TABLES TO pgstac_ingest; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT ALL ON FUNCTIONS TO pgstac_ingest; + +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT SELECT ON TABLES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT USAGE ON TYPES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT ALL ON SEQUENCES TO pgstac_read; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT ALL ON TABLES TO pgstac_ingest; +ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT ALL ON FUNCTIONS TO pgstac_ingest; SET SEARCH_PATH TO pgstac, public; +SET ROLE pgstac_admin; DO $$ BEGIN diff --git a/src/pgstac/sql/002_collections.sql b/src/pgstac/sql/002_collections.sql index 6a87ad31..8447d27e 100644 --- a/src/pgstac/sql/002_collections.sql +++ b/src/pgstac/sql/002_collections.sql @@ -10,7 +10,7 @@ $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; CREATE TABLE IF NOT EXISTS collections ( key bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, - id text GENERATED ALWAYS AS (content->>'id') STORED UNIQUE, + id text GENERATED ALWAYS AS (content->>'id') STORED UNIQUE NOT NULL, content JSONB NOT NULL, base_item jsonb GENERATED ALWAYS AS (pgstac.collection_base_item(content)) STORED, geometry geometry GENERATED ALWAYS AS (pgstac.collection_geom(content)) STORED, diff --git a/src/pgstac/sql/002a_queryables.sql b/src/pgstac/sql/002a_queryables.sql index 6d5ec060..347f5f61 100644 --- a/src/pgstac/sql/002a_queryables.sql +++ b/src/pgstac/sql/002a_queryables.sql @@ -189,6 +189,7 @@ BEGIN END; $$ LANGUAGE PLPGSQL IMMUTABLE STRICT PARALLEL SAFE; + CREATE OR REPLACE FUNCTION indexdef(q queryables) RETURNS text AS $$ DECLARE out text; @@ -210,7 +211,6 @@ CREATE OR REPLACE FUNCTION indexdef(q queryables) RETURNS text AS $$ END; $$ LANGUAGE PLPGSQL IMMUTABLE; - DROP VIEW IF EXISTS pgstac_indexes; CREATE VIEW pgstac_indexes AS SELECT @@ -253,6 +253,96 @@ FROM LEFT JOIN pg_stats s ON (s.tablename = i.indexname) WHERE i.schemaname='pgstac' and i.tablename ~ '_items_'; +CREATE OR REPLACE FUNCTION queryable_indexes( + IN treeroot text DEFAULT 'items', + IN changes boolean DEFAULT FALSE, + OUT collection text, + OUT partition text, + OUT field text, + OUT indexname text, + OUT existing_idx text, + OUT queryable_idx text +) RETURNS SETOF RECORD AS $$ +WITH p AS ( + SELECT + relid::text as partition, + replace(replace( + CASE + WHEN parentrelid::regclass::text='items' THEN pg_get_expr(c.relpartbound, c.oid) + ELSE pg_get_expr(parent.relpartbound, parent.oid) + END, + 'FOR VALUES IN (''',''), ''')', + '' + ) AS collection + FROM pg_partition_tree(treeroot) + JOIN pg_class c ON (relid::regclass = c.oid) + JOIN pg_class parent ON (parentrelid::regclass = parent.oid AND isleaf) + ), i AS ( + SELECT + partition, + indexname, + regexp_replace(btrim(replace(replace(indexdef, indexname, ''),'pgstac.',''),' \t\n'), '[ ]+', ' ', 'g') as iidx, + COALESCE( + (regexp_match(indexdef, '\(([a-zA-Z]+)\)'))[1], + (regexp_match(indexdef, '\(content -> ''properties''::text\) -> ''([a-zA-Z0-9\:\_-]+)''::text'))[1], + CASE WHEN indexdef ~* '\(datetime desc, end_datetime\)' THEN 'datetime' ELSE NULL END + ) AS field + FROM + pg_indexes + JOIN p ON (tablename=partition) + ), q AS ( + SELECT + name AS field, + collection, + partition, + format(indexdef(queryables), partition) as qidx + FROM queryables, unnest_collection(queryables.collection_ids) collection + JOIN p USING (collection) + WHERE property_index_type IS NOT NULL OR name IN ('datetime','geometry','id') + ) + SELECT + collection, + partition, + field, + indexname, + iidx as existing_idx, + qidx as queryable_idx + FROM i FULL JOIN q USING (field, partition) + WHERE CASE WHEN changes THEN lower(iidx) IS DISTINCT FROM lower(qidx) ELSE TRUE END; +; +$$ LANGUAGE SQL; + +CREATE OR REPLACE FUNCTION maintain_index( + indexname text, + queryable_idx text, + dropindexes boolean DEFAULT FALSE, + rebuildindexes boolean DEFAULT FALSE, + idxconcurrently boolean DEFAULT FALSE +) RETURNS VOID AS $$ +DECLARE +BEGIN + IF indexname IS NOT NULL THEN + IF dropindexes OR queryable_idx IS NOT NULL THEN + EXECUTE format('DROP INDEX IF EXISTS %I;', indexname); + ELSIF rebuildindexes THEN + IF idxconcurrently THEN + EXECUTE format('REINDEX INDEX CONCURRENTLY %I;', indexname); + ELSE + EXECUTE format('REINDEX INDEX CONCURRENTLY %I;', indexname); + END IF; + END IF; + END IF; + IF queryable_idx IS NOT NULL THEN + IF idxconcurrently THEN + EXECUTE replace(queryable_idx, 'INDEX', 'INDEX CONCURRENTLY'); + ELSE EXECUTE queryable_idx; + END IF; + END IF; +END; +$$ LANGUAGE PLPGSQL SECURITY DEFINER; + + + set check_function_bodies to off; CREATE OR REPLACE FUNCTION maintain_partition_queries( part text DEFAULT 'items', @@ -262,73 +352,23 @@ CREATE OR REPLACE FUNCTION maintain_partition_queries( ) RETURNS SETOF text AS $$ DECLARE rec record; + q text; BEGIN FOR rec IN ( - WITH p AS ( - SELECT - relid::text as partition, - replace(replace( - CASE - WHEN level = 1 THEN pg_get_expr(c.relpartbound, c.oid) - ELSE pg_get_expr(parent.relpartbound, parent.oid) - END, - 'FOR VALUES IN (''',''), ''')', - '' - ) AS collection - FROM pg_partition_tree('items') - JOIN pg_class c ON (relid::regclass = c.oid) - JOIN pg_class parent ON (parentrelid::regclass = parent.oid AND isleaf) - ), i AS ( - SELECT - partition, - indexname, - regexp_replace(btrim(replace(replace(indexdef, indexname, ''),'pgstac.',''),' \t\n'), '[ ]+', ' ', 'g') as iidx, - COALESCE( - (regexp_match(indexdef, '\(([a-zA-Z]+)\)'))[1], - (regexp_match(indexdef, '\(content -> ''properties''::text\) -> ''([a-zA-Z0-9\:\_-]+)''::text'))[1], - CASE WHEN indexdef ~* '\(datetime desc, end_datetime\)' THEN 'datetime' ELSE NULL END - ) AS field - FROM - pg_indexes - JOIN p ON (tablename=partition) - ), q AS ( - SELECT - name AS field, - collection, - partition, - format(indexdef(queryables), partition) as qidx - FROM queryables, unnest_collection(queryables.collection_ids) collection - JOIN p USING (collection) - WHERE property_index_type IS NOT NULL OR name IN ('datetime','geometry','id') - ) - SELECT * FROM i FULL JOIN q USING (field, partition) - WHERE lower(iidx) IS DISTINCT FROM lower(qidx) + SELECT * FROM queryable_indexes(part,true) ) LOOP - IF rec.iidx IS NULL THEN - IF idxconcurrently THEN - RETURN NEXT replace(rec.qidx, 'INDEX', 'INDEX CONCURRENTLY'); - ELSE - RETURN NEXT rec.qidx; - END IF; - ELSIF rec.qidx IS NULL AND dropindexes THEN - RETURN NEXT format('DROP INDEX IF EXISTS %I;', rec.indexname); - ELSIF lower(rec.qidx) != lower(rec.iidx) THEN - IF dropindexes THEN - RETURN NEXT format('DROP INDEX IF EXISTS %I; %s;', rec.indexname, rec.qidx); - ELSE - IF idxconcurrently THEN - RETURN NEXT replace(rec.qidx, 'INDEX', 'INDEX CONCURRENTLY'); - ELSE - RETURN NEXT rec.qidx; - END IF; - END IF; - ELSIF rebuildindexes and rec.indexname IS NOT NULL THEN - IF idxconcurrently THEN - RETURN NEXT format('REINDEX INDEX CONCURRENTLY %I;', rec.indexname); - ELSE - RETURN NEXT format('REINDEX INDEX %I;', rec.indexname); - END IF; - END IF; + q := format( + 'SELECT maintain_index( + %L,%L,%L,%L,%L + );', + rec.indexname, + rec.queryable_idx, + dropindexes, + rebuildindexes, + idxconcurrently + ); + RAISE NOTICE 'Q: %s', q; + RETURN NEXT q; END LOOP; RETURN; END; diff --git a/src/pgstac/sql/003b_partitions.sql b/src/pgstac/sql/003b_partitions.sql index 39b62d62..98b242a5 100644 --- a/src/pgstac/sql/003b_partitions.sql +++ b/src/pgstac/sql/003b_partitions.sql @@ -150,7 +150,8 @@ BEGIN REFRESH MATERIALIZED VIEW partition_steps; - RAISE NOTICE 'Checking if we need to modify constraints.'; + RAISE NOTICE 'Checking if we need to modify constraints...'; + RAISE NOTICE 'cdtrange: % dtrange: % cedtrange: % edtrange: %',cdtrange, dtrange, cedtrange, edtrange; IF (cdtrange IS DISTINCT FROM dtrange OR edtrange IS DISTINCT FROM cedtrange) AND NOT istrigger @@ -160,6 +161,8 @@ BEGIN RAISE NOTICE 'New % %', dtrange, edtrange; PERFORM drop_table_constraints(_partition); PERFORM create_table_constraints(_partition, dtrange, edtrange); + REFRESH MATERIALIZED VIEW partitions; + REFRESH MATERIALIZED VIEW partition_steps; END IF; RAISE NOTICE 'Checking if we need to update collection extents.'; IF get_setting_bool('update_collection_extent') THEN @@ -180,7 +183,7 @@ BEGIN END IF; END; -$$ LANGUAGE PLPGSQL STRICT; +$$ LANGUAGE PLPGSQL STRICT SECURITY DEFINER; CREATE OR REPLACE FUNCTION partition_name( IN collection text, IN dt timestamptz, OUT partition_name text, OUT partition_range tstzrange) AS $$ @@ -401,16 +404,19 @@ BEGIN _constraint_edtrange := _edtrange; _constraint_dtrange := _dtrange; END IF; + RAISE NOTICE 'EXISTING CONSTRAINTS % %, NEW % %', pm.constraint_dtrange, pm.constraint_edtrange, _constraint_dtrange, _constraint_edtrange; RAISE NOTICE 'Creating partition % %', _partition_name, _partition_dtrange; IF c.partition_trunc IS NULL THEN q := format( $q$ CREATE TABLE IF NOT EXISTS %I partition OF items FOR VALUES IN (%L); CREATE UNIQUE INDEX IF NOT EXISTS %I ON %I (id); + GRANT ALL ON %I to pgstac_ingest; $q$, _partition_name, _collection, concat(_partition_name,'_pk'), + _partition_name, _partition_name ); ELSE @@ -419,6 +425,7 @@ BEGIN CREATE TABLE IF NOT EXISTS %I partition OF items FOR VALUES IN (%L) PARTITION BY RANGE (datetime); CREATE TABLE IF NOT EXISTS %I partition OF %I FOR VALUES FROM (%L) TO (%L); CREATE UNIQUE INDEX IF NOT EXISTS %I ON %I (id); + GRANT ALL ON %I TO pgstac_ingest; $q$, format('_items_%s', c.key), _collection, @@ -427,6 +434,7 @@ BEGIN lower(_partition_dtrange), upper(_partition_dtrange), format('%s_pk', _partition_name), + _partition_name, _partition_name ); END IF; @@ -442,7 +450,6 @@ BEGIN RAISE INFO 'Error State:%', SQLSTATE; RAISE INFO 'Error Context:%', err_context; END; - PERFORM create_table_constraints(_partition_name, _constraint_dtrange, _constraint_edtrange); PERFORM maintain_partitions(_partition_name); PERFORM update_partition_stats_q(_partition_name, true); REFRESH MATERIALIZED VIEW partitions; @@ -480,11 +487,12 @@ BEGIN WITH p AS ( SELECT collection, - CASE WHEN %L IS NULL THEN '-infinity'::timestamptz - ELSE date_trunc(%L, datetime) + CASE + WHEN %L IS NULL THEN '-infinity'::timestamptz + ELSE date_trunc(%L, datetime) END as d, tstzrange(min(datetime),max(datetime),'[]') as dtrange, - tstzrange(min(datetime),max(datetime),'[]') as edtrange + tstzrange(min(end_datetime),max(end_datetime),'[]') as edtrange FROM changepartitionstaging GROUP BY 1,2 ) SELECT check_partition(collection, dtrange, edtrange) FROM p; diff --git a/src/pgstac/sql/998_idempotent_post.sql b/src/pgstac/sql/998_idempotent_post.sql index d90a6d78..1e0db8af 100644 --- a/src/pgstac/sql/998_idempotent_post.sql +++ b/src/pgstac/sql/998_idempotent_post.sql @@ -50,6 +50,16 @@ ALTER FUNCTION to_int COST 5000; ALTER FUNCTION to_tstz COST 5000; ALTER FUNCTION to_text_array COST 5000; +ALTER FUNCTION update_partition_stats SECURITY DEFINER; +ALTER FUNCTION partition_after_triggerfunc SECURITY DEFINER; +ALTER FUNCTION drop_table_constraints SECURITY DEFINER; +ALTER FUNCTION create_table_constraints SECURITY DEFINER; +ALTER FUNCTION check_partition SECURITY DEFINER; +ALTER FUNCTION repartition SECURITY DEFINER; +ALTER FUNCTION where_stats SECURITY DEFINER; +ALTER FUNCTION search_query SECURITY DEFINER; +ALTER FUNCTION format_item SECURITY DEFINER; +ALTER FUNCTION maintain_index SECURITY DEFINER; GRANT USAGE ON SCHEMA pgstac to pgstac_read; GRANT ALL ON SCHEMA pgstac to pgstac_ingest; @@ -67,4 +77,13 @@ GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA pgstac to pgstac_ingest; GRANT ALL ON ALL TABLES IN SCHEMA pgstac to pgstac_ingest; GRANT USAGE ON ALL SEQUENCES IN SCHEMA pgstac to pgstac_ingest; +REVOKE ALL PRIVILEGES ON PROCEDURE run_queued_queries FROM public; +GRANT ALL ON PROCEDURE run_queued_queries TO pgstac_admin; + +REVOKE ALL PRIVILEGES ON FUNCTION run_queued_queries_intransaction FROM public; +GRANT ALL ON FUNCTION run_queued_queries_intransaction TO pgstac_admin; + +RESET ROLE; + +SET ROLE pgstac_ingest; SELECT update_partition_stats_q(partition) FROM partitions_view; diff --git a/src/pgstac/tests/basic/cql2_searches.sql b/src/pgstac/tests/basic/cql2_searches.sql index ec9127ec..280e16fe 100644 --- a/src/pgstac/tests/basic/cql2_searches.sql +++ b/src/pgstac/tests/basic/cql2_searches.sql @@ -1,3 +1,4 @@ +SET ROLE pgstac_read; SET pgstac."default_filter_lang" TO 'cql2-json'; SELECT search('{"ids":["pgstac-test-item-0097"],"fields":{"include":["id"]}}'); diff --git a/src/pgstac/tests/basic/cql2_searches.sql.out b/src/pgstac/tests/basic/cql2_searches.sql.out index 613bfb08..2cde9881 100644 --- a/src/pgstac/tests/basic/cql2_searches.sql.out +++ b/src/pgstac/tests/basic/cql2_searches.sql.out @@ -1,3 +1,5 @@ +SET ROLE pgstac_read; +SET SET pgstac."default_filter_lang" TO 'cql2-json'; SET SELECT search('{"ids":["pgstac-test-item-0097"],"fields":{"include":["id"]}}'); diff --git a/src/pgstac/tests/basic/partitions.sql b/src/pgstac/tests/basic/partitions.sql index 43badd5c..07019c86 100644 --- a/src/pgstac/tests/basic/partitions.sql +++ b/src/pgstac/tests/basic/partitions.sql @@ -1,3 +1,5 @@ +-- run tests as pgstac_ingest +SET ROLE pgstac_ingest; SET pgstac.use_queue=FALSE; SELECT get_setting_bool('use_queue'); SET pgstac.update_collection_extent=TRUE; diff --git a/src/pgstac/tests/basic/partitions.sql.out b/src/pgstac/tests/basic/partitions.sql.out index a27bc18f..f0eafeef 100644 --- a/src/pgstac/tests/basic/partitions.sql.out +++ b/src/pgstac/tests/basic/partitions.sql.out @@ -1,3 +1,6 @@ +-- run tests as pgstac_ingest +SET ROLE pgstac_ingest; +SET SET pgstac.use_queue=FALSE; SET SELECT get_setting_bool('use_queue'); diff --git a/src/pgstac/tests/pgtap.sql b/src/pgstac/tests/pgtap.sql index 8014526f..de0c7b91 100644 --- a/src/pgstac/tests/pgtap.sql +++ b/src/pgstac/tests/pgtap.sql @@ -17,7 +17,7 @@ CREATE EXTENSION IF NOT EXISTS pgtap; SET SEARCH_PATH TO pgstac, pgtap, public; -- Plan the tests. -SELECT plan(202); +SELECT plan(218); --SELECT * FROM no_plan(); -- Run the tests. @@ -31,6 +31,7 @@ SELECT plan(202); \i tests/pgtap/002a_queryables.sql \i tests/pgtap/003_items.sql \i tests/pgtap/004_search.sql +\i tests/pgtap/004a_collectionsearch.sql \i tests/pgtap/005_tileutils.sql \i tests/pgtap/006_tilesearch.sql \i tests/pgtap/999_version.sql diff --git a/src/pgstac/tests/pgtap/001_core.sql b/src/pgstac/tests/pgtap/001_core.sql index 51f15257..65394ce2 100644 --- a/src/pgstac/tests/pgtap/001_core.sql +++ b/src/pgstac/tests/pgtap/001_core.sql @@ -53,3 +53,13 @@ SELECT lives_ok( ); RESET transaction_read_only; RESET pgstac.readonly; +SELECT is_definer('update_partition_stats'); +SELECT is_definer('partition_after_triggerfunc'); +SELECT is_definer('drop_table_constraints'); +SELECT is_definer('create_table_constraints'); +SELECT is_definer('check_partition'); +SELECT is_definer('repartition'); +SELECT is_definer('where_stats'); +SELECT is_definer('search_query'); +SELECT is_definer('format_item'); +SELECT is_definer('maintain_index'); diff --git a/src/pypgstac/pyproject.toml b/src/pypgstac/pyproject.toml index 9427b335..f98f4390 100644 --- a/src/pypgstac/pyproject.toml +++ b/src/pypgstac/pyproject.toml @@ -25,8 +25,7 @@ dependencies = [ "python-dateutil==2.8.*", "fire==0.4.*", "plpygis==0.2.*", - "pydantic~=2.0", - "pydantic_settings~=2.0", + "pydantic>=1.7", "tenacity==8.1.*", "cachetools==5.3.*", "version-parser>= 1.0.1", diff --git a/src/pypgstac/python/pypgstac/db.py b/src/pypgstac/python/pypgstac/db.py index ea5ca095..e5c4d3d3 100644 --- a/src/pypgstac/python/pypgstac/db.py +++ b/src/pypgstac/python/pypgstac/db.py @@ -10,7 +10,12 @@ from psycopg import Connection, sql from psycopg.types.json import set_json_dumps, set_json_loads from psycopg_pool import ConnectionPool -from pydantic_settings import BaseSettings + +try: + from pydantic.v1 import BaseSettings # type:ignore +except ImportError: + from pydantic import BaseSettings # type:ignore + from tenacity import retry, retry_if_exception_type, stop_after_attempt logger = logging.getLogger(__name__) @@ -57,6 +62,7 @@ def __init__( connection: Optional[Connection] = None, commit_on_exit: bool = True, debug: bool = False, + use_queue: bool = False, ) -> None: """Initialize Database.""" self.dsn: str @@ -69,6 +75,7 @@ def __init__( self.commit_on_exit = commit_on_exit self.initial_version = "0.1.9" self.debug = debug + self.use_queue = use_queue if self.debug: logging.basicConfig(level=logging.DEBUG) @@ -102,6 +109,15 @@ def connect(self) -> Connection: self.connection.autocommit = True if self.debug: self.connection.add_notice_handler(pg_notice_handler) + self.connection.execute( + "SET CLIENT_MIN_MESSAGES TO NOTICE;", + prepare=False, + ) + if self.use_queue: + self.connection.execute( + "SET pgstac.use_queue TO TRUE;", + prepare=False, + ) atexit.register(self.disconnect) self.connection.execute( """ @@ -219,6 +235,15 @@ def query_one(self, *args: Any, **kwargs: Any) -> Union[Tuple, str, None]: return r[0] return r + def run_queued(self) -> str: + try: + self.connect().execute(""" + CALL run_queued_queries(); + """) + return "Ran Queued Queries" + except Exception as e: + return f"Error Running Queued Queries: {e}" + @property def version(self) -> Optional[str]: """Get the current version number from a pgstac database.""" diff --git a/src/pypgstac/python/pypgstac/load.py b/src/pypgstac/python/pypgstac/load.py index 1ce62096..2e9bcc6a 100644 --- a/src/pypgstac/python/pypgstac/load.py +++ b/src/pypgstac/python/pypgstac/load.py @@ -441,7 +441,10 @@ def load_partition( "Available modes are insert, ignore, upsert, and delsert." f"You entered {insert_mode}.", ) + logger.debug("Updating Partition Stats") cur.execute("SELECT update_partition_stats_q(%s);",(partition.name,)) + logger.debug(cur.statusmessage) + logger.debug(f"Rows affected: {cur.rowcount}") logger.debug( f"Copying data for {partition} took {time.perf_counter() - t} seconds", ) diff --git a/src/pypgstac/python/pypgstac/pypgstac.py b/src/pypgstac/python/pypgstac/pypgstac.py index b94c3ec9..1cfbdb38 100644 --- a/src/pypgstac/python/pypgstac/pypgstac.py +++ b/src/pypgstac/python/pypgstac/pypgstac.py @@ -16,14 +16,18 @@ class PgstacCLI: """CLI for PgStac.""" def __init__( - self, dsn: Optional[str] = "", version: bool = False, debug: bool = False, + self, + dsn: Optional[str] = "", + version: bool = False, + debug: bool = False, + usequeue: bool = False, ): """Initialize PgStac CLI.""" if version: sys.exit(0) self.dsn = dsn - self._db = PgstacDB(dsn=dsn, debug=debug) + self._db = PgstacDB(dsn=dsn, debug=debug, use_queue=usequeue) if debug: logging.basicConfig(level=logging.DEBUG) sys.tracebacklimit = 1000 @@ -71,6 +75,9 @@ def load( if table == "items": loader.load_items(file, method, dehydrated, chunksize) + def runqueue(self) -> str: + return self._db.run_queued() + def loadextensions(self) -> None: conn = self._db.connect() diff --git a/src/pypgstac/tests/test_load.py b/src/pypgstac/tests/test_load.py index 49c5d0ef..55006631 100644 --- a/src/pypgstac/tests/test_load.py +++ b/src/pypgstac/tests/test_load.py @@ -419,9 +419,10 @@ def test_load_items_nopartitionconstraint_succeeds(loader: Loader) -> None: cdtmin = loader.db.query_one( """ SELECT lower(constraint_dtrange)::text - FROM partitions WHERE partition = '_items_1'; + FROM partition_sys_meta WHERE partition = '_items_1'; """, ) + assert cdtmin == "2011-07-31 00:00:00+00" with loader.db.connect() as conn: conn.execute( @@ -432,7 +433,7 @@ def test_load_items_nopartitionconstraint_succeeds(loader: Loader) -> None: cdtmin = loader.db.query_one( """ SELECT lower(constraint_dtrange)::text - FROM partitions_view WHERE partition = '_items_1'; + FROM partition_sys_meta WHERE partition = '_items_1'; """, ) assert cdtmin == "-infinity" @@ -441,3 +442,10 @@ def test_load_items_nopartitionconstraint_succeeds(loader: Loader) -> None: str(TEST_ITEMS), insert_mode=Methods.upsert, ) + cdtmin = loader.db.query_one( + """ + SELECT lower(constraint_dtrange)::text + FROM partition_sys_meta WHERE partition = '_items_1'; + """, + ) + assert cdtmin == "2011-07-31 00:00:00+00"