diff --git a/Dockerfile b/Dockerfile index 366d5f1..cb139ea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -28,6 +28,13 @@ RUN curl -L ${NYCDB_REPO}/archive/${NYCDB_REV}.zip > nyc-db.zip \ && cd nyc-db/src \ && pip install -e . +ARG WOW_REPO=https://github.com/justFixNYC/who-owns-what +ARG WOW_REV=cae0b6de35eba25df3376f6e890312aa55356c98 +RUN curl -L ${WOW_REPO}/archive/${WOW_REV}.zip > wow.zip \ + && unzip wow.zip \ + && rm wow.zip \ + && mv who-owns-what-${WOW_REV} who-owns-what + COPY . /app WORKDIR /app diff --git a/README.md b/README.md index 8e0f7cc..99c7784 100644 --- a/README.md +++ b/README.md @@ -202,6 +202,27 @@ Our continuous integration system will then ensure that everything still works, and once the PR is merged into `master`, Docker Hub will re-publish a new container image that uses the latest version of NYC-DB. +## Updating Who Owns What data + +This repository also contains `wowutil.py`, a tool for creating and +updating the NYCDB-derived tables and functions required by +[Who Owns What][] (WoW). + +Currently, the tool creates WoW's tables and functions under a +Postgres schema called `wow`. It's the responsibility of the +database administrator to set the [Postgres schema search path][] +to `wow, public` for WoW's functions to work properly. + +Unlike the NYCDB datasets, at the time of this writing, there are +no tools to automate the *scheduling* of WoW data updates. + +It is also your responsibility to ensure that the NYCDB dependencies +of the WoW data are already in the database at the time that +`wowutil.py` is used to generate the WoW tables and functions. + +The specific version of WoW used by `wowutil.py` is specified +by the `WOW_REV` argument in the `Dockerfile`. + [Cron Jobs]: https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ [NYC-DB]: https://github.com/aepyornis/nyc-db [Kubernetes Jobs]: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/ @@ -213,3 +234,5 @@ a new container image that uses the latest version of NYC-DB. [Scheduled Tasks]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/scheduled_tasks.html [rev]: https://github.com/JustFixNYC/nycdb-k8s-loader/blob/master/Dockerfile#L19 [Postgres schema]: https://www.postgresql.org/docs/9.5/ddl-schemas.html +[Who Owns What]: https://github.com/justfixnyc/who-owns-what +[Postgres schema search path]: https://www.postgresql.org/docs/9.6/ddl-schemas.html#DDL-SCHEMAS-PATH diff --git a/lib/parse_created_tables.py b/lib/parse_created_tables.py index 4f4bbe8..7c7098c 100644 --- a/lib/parse_created_tables.py +++ b/lib/parse_created_tables.py @@ -28,12 +28,16 @@ def parse_created_tables(sql: str) -> List[str]: @lru_cache() -def _parse_nycdb_sql_file(filename: str) -> List[str]: - return parse_created_tables((NYCDB_SQL_DIR / filename).read_text()) +def _parse_sql_file(path: Path) -> List[str]: + return parse_created_tables(path.read_text()) def parse_nycdb_created_tables(filenames: List[str]) -> List[str]: + return parse_created_tables_in_dir(NYCDB_SQL_DIR, filenames) + + +def parse_created_tables_in_dir(root_dir: Path, filenames: List[str]) -> List[str]: result: List[str] = [] for filename in filenames: - result.extend(_parse_nycdb_sql_file(filename)) + result.extend(_parse_sql_file(root_dir / filename)) return result diff --git a/load_dataset.py b/load_dataset.py index 0f17e59..35ef27c 100644 --- a/load_dataset.py +++ b/load_dataset.py @@ -2,9 +2,11 @@ import sys import contextlib import time +import re from pathlib import Path from typing import NamedTuple, List from types import SimpleNamespace +import nycdb import urllib.parse import nycdb.dataset from nycdb.dataset import Dataset @@ -101,6 +103,48 @@ def get_urls_for_dataset(dataset: str) -> List[str]: ] +def get_all_create_function_sql(root_dir: Path, sql_files: List[str]) -> str: + ''' + Given the SQL files in the given root directory, concatenate the + contents of only the ones that contain "CREATE OR REPLACE FUNCTION" + SQL statements. It's assumed that these particular SQL files are + idempotent. + ''' + + sqls: List[str] = [] + + for sql_file in sql_files: + sql = (root_dir / sql_file).read_text() + if does_sql_create_functions(sql): + sqls.append(sql) + + return '\n'.join(sqls) + + +def get_all_create_function_sql_for_dataset(dataset: str) -> str: + return get_all_create_function_sql( + root_dir=Path(nycdb.__file__).parent.resolve() / 'sql', + sql_files=nycdb.dataset.datasets()[dataset].get('sql', []) + ) + + +def run_sql_if_nonempty(conn, sql: str, initial_sql: str = ''): + if sql: + with conn.cursor() as cur: + if initial_sql: + cur.execute(initial_sql) + cur.execute(sql) + conn.commit() + + +def collapse_whitespace(text: str) -> str: + return re.sub(r'\W+', ' ', text) + + +def does_sql_create_functions(sql: str) -> bool: + return 'CREATE OR REPLACE FUNCTION' in collapse_whitespace(sql).upper() + + def drop_tables_if_they_exist(conn, tables: List[TableInfo], schema: str): with conn.cursor() as cur: for table in tables: @@ -226,6 +270,13 @@ def load_dataset(dataset: str, config: Config=Config(), force_check_urls: bool=F with save_and_reapply_permissions(conn, tables, 'public'): drop_tables_if_they_exist(conn, tables, 'public') change_table_schemas(conn, tables, temp_schema, 'public') + + # The dataset's tables are ready, but any functions defined by the + # dataset's custom SQL were in the temporary schema that just got + # destroyed. Let's re-run only the function-creating SQL for the + # dataset now, in the public schema so that clients can use it. + run_sql_if_nonempty(conn, get_all_create_function_sql_for_dataset(dataset)) + modtracker.update_lastmods() slack.sendmsg(f'Finished loading the dataset `{dataset}` into the database.') print("Success!") diff --git a/tests/conftest.py b/tests/conftest.py index be48cd7..b48b0a3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -42,11 +42,13 @@ def create_db(dbname): exec_outside_of_transaction('CREATE DATABASE ' + dbname) -@pytest.fixture(scope="session") +@pytest.fixture() def db(): """ Attempt to connect to the database, retrying if necessary, and also creating the database if it doesn't already exist. + + This will also ensure the database is empty when the test starts. """ retries_left = 5 diff --git a/tests/test_load_dataset.py b/tests/test_load_dataset.py index 4700055..f92a477 100644 --- a/tests/test_load_dataset.py +++ b/tests/test_load_dataset.py @@ -6,6 +6,11 @@ from .conftest import DATABASE_URL, make_conn import load_dataset +from load_dataset import ( + does_sql_create_functions, + get_all_create_function_sql_for_dataset, + collapse_whitespace +) import dbtool @@ -17,13 +22,6 @@ def test_get_dataset_tables_included_derived_tables(): assert info in load_dataset.get_dataset_tables() -def drop_dataset_tables(conn, dataset: str, ok_if_nonexistent: bool): - with conn.cursor() as cur: - for table in load_dataset.get_tables_for_dataset(dataset): - if_exists = 'IF EXISTS' if ok_if_nonexistent else '' - cur.execute(f'DROP TABLE {if_exists} {table.name}') - - def get_row_counts(conn, dataset: str) -> Dict[str, int]: tables = [table.name for table in load_dataset.get_tables_for_dataset(dataset)] return dict(dbtool.get_rowcounts(conn, tables)) @@ -36,16 +34,21 @@ def test_get_urls_for_dataset_works(): assert url.startswith('https://') +def run_dataset_specific_test_logic(conn, dataset): + if dataset == 'hpd_registrations': + with conn.cursor() as cur: + # Make sure the function defined by the dataset's SQL scripts exists. + cur.execute('SELECT get_corporate_owner_info_for_regid(1)') + + @pytest.mark.parametrize('dataset', nycdb.dataset.datasets().keys()) def test_load_dataset_works(test_db_env, dataset): - with make_conn() as conn: - drop_dataset_tables(conn, dataset, ok_if_nonexistent=True) - subprocess.check_call([ 'python', 'load_dataset.py', dataset ], env=test_db_env) with make_conn() as conn: + run_dataset_specific_test_logic(conn, dataset) table_counts = get_row_counts(conn, dataset) assert len(table_counts) > 0 @@ -59,8 +62,8 @@ def test_load_dataset_works(test_db_env, dataset): ], env=test_db_env) with make_conn() as conn: + run_dataset_specific_test_logic(conn, dataset) assert get_row_counts(conn, dataset) == table_counts - drop_dataset_tables(conn, dataset, ok_if_nonexistent=False) def test_load_dataset_fails_if_no_dataset_provided(test_db_env): @@ -128,3 +131,25 @@ def test_unmodified_datasets_are_not_retrieved(db, requests_mock, slack_outbox): requests_mock.get(url, text='blah2', headers={'ETag': 'blah2'}) load() assert slack_outbox[0] == 'Downloading the dataset `hpd_registrations`...' + + +def test_does_sql_create_functions_works(): + assert does_sql_create_functions('\nCREATE OR REPLACE FUNCTION boop()') is True + assert does_sql_create_functions('CREATE OR REPLACE \nFUNCTION boop()') is True + assert does_sql_create_functions('create or replace \nfunction boop()') is True + assert does_sql_create_functions('CREATE OR ZZZ REPLACE FUNCTION boop()') is False + assert does_sql_create_functions('') is False + assert does_sql_create_functions('CREATE TABLE blarg') is False + + +def test_get_all_create_function_sql_for_dataset_conforms_to_expectations(): + for dataset in nycdb.dataset.datasets().keys(): + # Make sure that all the SQL that creates functions *only* creates + # functions and doesn't e.g. create tables. + sql = get_all_create_function_sql_for_dataset(dataset) + assert 'CREATE TABLE' not in collapse_whitespace(sql).upper() + + +def test_get_all_create_function_sql_for_dataset_works(): + sql = get_all_create_function_sql_for_dataset('hpd_registrations') + assert 'CREATE OR REPLACE FUNCTION get_corporate_owner_info_for_regid' in sql diff --git a/tests/test_wowutil.py b/tests/test_wowutil.py new file mode 100644 index 0000000..feeb4b7 --- /dev/null +++ b/tests/test_wowutil.py @@ -0,0 +1,26 @@ +import psycopg2 +from nycdb.dataset import Dataset + +from .conftest import DATABASE_URL +from load_dataset import Config +import wowutil + + +def test_it_works(db, slack_outbox): + config = Config(database_url=DATABASE_URL, use_test_data=True) + for dataset_name in wowutil.WOW_YML['dependencies']: + ds = Dataset(dataset_name, args=config.nycdb_args) + ds.db_import() + wowutil.main(['build'], db_url=DATABASE_URL) + + with psycopg2.connect(DATABASE_URL) as conn: + with conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM wow.wow_bldgs") + assert cur.fetchone()[0] > 0 + + # Make sure functions are defined, at least. + cur.execute("SET search_path TO wow, public") + cur.execute("SELECT wow.get_assoc_addrs_from_bbl('blah')") + + assert slack_outbox[0] == 'Rebuilding Who Owns What tables...' + assert slack_outbox[1] == 'Finished rebuilding Who Owns What tables.' diff --git a/wowutil.py b/wowutil.py new file mode 100644 index 0000000..4cfd2e5 --- /dev/null +++ b/wowutil.py @@ -0,0 +1,98 @@ +"""\ +Perform operations on the Who Owns What database tables. + +Usage: + wowutil.py build + +Options: + -h --help Show this screen. + +Environment variables: + DATABASE_URL The URL of the NYC-DB and WoW database. +""" + +import sys +import os +from pathlib import Path +from typing import List +import docopt +import psycopg2 +import yaml + +from lib import slack +from lib.parse_created_tables import parse_created_tables_in_dir +from load_dataset import ( + create_temp_schema_name, + create_and_enter_temporary_schema, + save_and_reapply_permissions, + ensure_schema_exists, + drop_tables_if_they_exist, + change_table_schemas, + run_sql_if_nonempty, + get_all_create_function_sql, + TableInfo +) + + +WOW_SCHEMA = 'wow' + +WOW_DIR = Path('/who-owns-what') + +WOW_SQL_DIR = Path(WOW_DIR / 'sql') + +WOW_YML = yaml.load((WOW_DIR / 'who-owns-what.yml').read_text()) + +WOW_SCRIPTS: List[str] = WOW_YML['sql'] + + +def run_wow_sql(conn): + with conn.cursor() as cur: + for filename in WOW_SCRIPTS: + print(f"Running {filename}...") + sql = (WOW_SQL_DIR / filename).read_text() + cur.execute(sql) + conn.commit() + + +def build(db_url: str): + slack.sendmsg('Rebuilding Who Owns What tables...') + + cosmetic_dataset_name = 'wow' + + tables = [ + TableInfo(name=name, dataset=cosmetic_dataset_name) + for name in parse_created_tables_in_dir(WOW_SQL_DIR, WOW_SCRIPTS) + ] + + with psycopg2.connect(db_url) as conn: + temp_schema = create_temp_schema_name(cosmetic_dataset_name) + with create_and_enter_temporary_schema(conn, temp_schema): + run_wow_sql(conn) + ensure_schema_exists(conn, WOW_SCHEMA) + with save_and_reapply_permissions(conn, tables, WOW_SCHEMA): + drop_tables_if_they_exist(conn, tables, WOW_SCHEMA) + change_table_schemas(conn, tables, temp_schema, WOW_SCHEMA) + + # The WoW tables are now ready, but the functions defined by WoW were + # in the temporary schema that just got destroyed. Let's re-run only + # the function-creating SQL in the WoW schema now. + # + # Note this means that any client which uses the functions will need + # to set their search_path to "{WOW_SCHEMA}, public" or else the function + # may not be found or might even crash! + print(f"Re-running CREATE FUNCTION statements in the {WOW_SCHEMA} schema...") + sql = get_all_create_function_sql(WOW_SQL_DIR, WOW_SCRIPTS) + run_sql_if_nonempty(conn, sql, initial_sql=f'SET search_path TO {WOW_SCHEMA}, public') + + slack.sendmsg('Finished rebuilding Who Owns What tables.') + + +def main(argv: List[str], db_url: str): + args = docopt.docopt(__doc__, argv=argv) + + if args['build']: + build(db_url) + + +if __name__ == '__main__': + main(argv=sys.argv[1:], db_url=os.environ['DATABASE_URL'])