Skip to content

Commit

Permalink
Add support for generating Who Owns What tables and functions (#22)
Browse files Browse the repository at this point in the history
* Add wowutil.py.

* Fix broken test suite.

* Ensure wow.wow_bldgs() is populated.

* Ensure functions defined by datasets and WoW are available.

* Auto-detect tables generated by WoW.

* Retrieve WoW dependencies and scripts from its YAML file.

* Document wowutil.py in the README.

* Fix typo and add a print statement.
  • Loading branch information
toolness committed Feb 11, 2019
1 parent dce7ac8 commit 1a572c3
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 15 deletions.
7 changes: 7 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ RUN curl -L ${NYCDB_REPO}/archive/${NYCDB_REV}.zip > nyc-db.zip \
&& cd nyc-db/src \
&& pip install -e .

ARG WOW_REPO=https://github.com/justFixNYC/who-owns-what
ARG WOW_REV=cae0b6de35eba25df3376f6e890312aa55356c98
RUN curl -L ${WOW_REPO}/archive/${WOW_REV}.zip > wow.zip \
&& unzip wow.zip \
&& rm wow.zip \
&& mv who-owns-what-${WOW_REV} who-owns-what

COPY . /app

WORKDIR /app
Expand Down
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,27 @@ Our continuous integration system will then ensure that everything still
works, and once the PR is merged into `master`, Docker Hub will re-publish
a new container image that uses the latest version of NYC-DB.

## Updating Who Owns What data

This repository also contains `wowutil.py`, a tool for creating and
updating the NYCDB-derived tables and functions required by
[Who Owns What][] (WoW).

Currently, the tool creates WoW's tables and functions under a
Postgres schema called `wow`. It's the responsibility of the
database administrator to set the [Postgres schema search path][]
to `wow, public` for WoW's functions to work properly.

Unlike the NYCDB datasets, at the time of this writing, there are
no tools to automate the *scheduling* of WoW data updates.

It is also your responsibility to ensure that the NYCDB dependencies
of the WoW data are already in the database at the time that
`wowutil.py` is used to generate the WoW tables and functions.

The specific version of WoW used by `wowutil.py` is specified
by the `WOW_REV` argument in the `Dockerfile`.

[Cron Jobs]: https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/
[NYC-DB]: https://github.com/aepyornis/nyc-db
[Kubernetes Jobs]: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/
Expand All @@ -213,3 +234,5 @@ a new container image that uses the latest version of NYC-DB.
[Scheduled Tasks]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/scheduled_tasks.html
[rev]: https://github.com/JustFixNYC/nycdb-k8s-loader/blob/master/Dockerfile#L19
[Postgres schema]: https://www.postgresql.org/docs/9.5/ddl-schemas.html
[Who Owns What]: https://github.com/justfixnyc/who-owns-what
[Postgres schema search path]: https://www.postgresql.org/docs/9.6/ddl-schemas.html#DDL-SCHEMAS-PATH
10 changes: 7 additions & 3 deletions lib/parse_created_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ def parse_created_tables(sql: str) -> List[str]:


@lru_cache()
def _parse_nycdb_sql_file(filename: str) -> List[str]:
return parse_created_tables((NYCDB_SQL_DIR / filename).read_text())
def _parse_sql_file(path: Path) -> List[str]:
return parse_created_tables(path.read_text())


def parse_nycdb_created_tables(filenames: List[str]) -> List[str]:
return parse_created_tables_in_dir(NYCDB_SQL_DIR, filenames)


def parse_created_tables_in_dir(root_dir: Path, filenames: List[str]) -> List[str]:
result: List[str] = []
for filename in filenames:
result.extend(_parse_nycdb_sql_file(filename))
result.extend(_parse_sql_file(root_dir / filename))
return result
51 changes: 51 additions & 0 deletions load_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import sys
import contextlib
import time
import re
from pathlib import Path
from typing import NamedTuple, List
from types import SimpleNamespace
import nycdb
import urllib.parse
import nycdb.dataset
from nycdb.dataset import Dataset
Expand Down Expand Up @@ -101,6 +103,48 @@ def get_urls_for_dataset(dataset: str) -> List[str]:
]


def get_all_create_function_sql(root_dir: Path, sql_files: List[str]) -> str:
'''
Given the SQL files in the given root directory, concatenate the
contents of only the ones that contain "CREATE OR REPLACE FUNCTION"
SQL statements. It's assumed that these particular SQL files are
idempotent.
'''

sqls: List[str] = []

for sql_file in sql_files:
sql = (root_dir / sql_file).read_text()
if does_sql_create_functions(sql):
sqls.append(sql)

return '\n'.join(sqls)


def get_all_create_function_sql_for_dataset(dataset: str) -> str:
return get_all_create_function_sql(
root_dir=Path(nycdb.__file__).parent.resolve() / 'sql',
sql_files=nycdb.dataset.datasets()[dataset].get('sql', [])
)


def run_sql_if_nonempty(conn, sql: str, initial_sql: str = ''):
if sql:
with conn.cursor() as cur:
if initial_sql:
cur.execute(initial_sql)
cur.execute(sql)
conn.commit()


def collapse_whitespace(text: str) -> str:
return re.sub(r'\W+', ' ', text)


def does_sql_create_functions(sql: str) -> bool:
return 'CREATE OR REPLACE FUNCTION' in collapse_whitespace(sql).upper()


def drop_tables_if_they_exist(conn, tables: List[TableInfo], schema: str):
with conn.cursor() as cur:
for table in tables:
Expand Down Expand Up @@ -226,6 +270,13 @@ def load_dataset(dataset: str, config: Config=Config(), force_check_urls: bool=F
with save_and_reapply_permissions(conn, tables, 'public'):
drop_tables_if_they_exist(conn, tables, 'public')
change_table_schemas(conn, tables, temp_schema, 'public')

# The dataset's tables are ready, but any functions defined by the
# dataset's custom SQL were in the temporary schema that just got
# destroyed. Let's re-run only the function-creating SQL for the
# dataset now, in the public schema so that clients can use it.
run_sql_if_nonempty(conn, get_all_create_function_sql_for_dataset(dataset))

modtracker.update_lastmods()
slack.sendmsg(f'Finished loading the dataset `{dataset}` into the database.')
print("Success!")
Expand Down
4 changes: 3 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ def create_db(dbname):
exec_outside_of_transaction('CREATE DATABASE ' + dbname)


@pytest.fixture(scope="session")
@pytest.fixture()
def db():
"""
Attempt to connect to the database, retrying if necessary, and also
creating the database if it doesn't already exist.
This will also ensure the database is empty when the test starts.
"""

retries_left = 5
Expand Down
47 changes: 36 additions & 11 deletions tests/test_load_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

from .conftest import DATABASE_URL, make_conn
import load_dataset
from load_dataset import (
does_sql_create_functions,
get_all_create_function_sql_for_dataset,
collapse_whitespace
)
import dbtool


Expand All @@ -17,13 +22,6 @@ def test_get_dataset_tables_included_derived_tables():
assert info in load_dataset.get_dataset_tables()


def drop_dataset_tables(conn, dataset: str, ok_if_nonexistent: bool):
with conn.cursor() as cur:
for table in load_dataset.get_tables_for_dataset(dataset):
if_exists = 'IF EXISTS' if ok_if_nonexistent else ''
cur.execute(f'DROP TABLE {if_exists} {table.name}')


def get_row_counts(conn, dataset: str) -> Dict[str, int]:
tables = [table.name for table in load_dataset.get_tables_for_dataset(dataset)]
return dict(dbtool.get_rowcounts(conn, tables))
Expand All @@ -36,16 +34,21 @@ def test_get_urls_for_dataset_works():
assert url.startswith('https://')


def run_dataset_specific_test_logic(conn, dataset):
if dataset == 'hpd_registrations':
with conn.cursor() as cur:
# Make sure the function defined by the dataset's SQL scripts exists.
cur.execute('SELECT get_corporate_owner_info_for_regid(1)')


@pytest.mark.parametrize('dataset', nycdb.dataset.datasets().keys())
def test_load_dataset_works(test_db_env, dataset):
with make_conn() as conn:
drop_dataset_tables(conn, dataset, ok_if_nonexistent=True)

subprocess.check_call([
'python', 'load_dataset.py', dataset
], env=test_db_env)

with make_conn() as conn:
run_dataset_specific_test_logic(conn, dataset)
table_counts = get_row_counts(conn, dataset)

assert len(table_counts) > 0
Expand All @@ -59,8 +62,8 @@ def test_load_dataset_works(test_db_env, dataset):
], env=test_db_env)

with make_conn() as conn:
run_dataset_specific_test_logic(conn, dataset)
assert get_row_counts(conn, dataset) == table_counts
drop_dataset_tables(conn, dataset, ok_if_nonexistent=False)


def test_load_dataset_fails_if_no_dataset_provided(test_db_env):
Expand Down Expand Up @@ -128,3 +131,25 @@ def test_unmodified_datasets_are_not_retrieved(db, requests_mock, slack_outbox):
requests_mock.get(url, text='blah2', headers={'ETag': 'blah2'})
load()
assert slack_outbox[0] == 'Downloading the dataset `hpd_registrations`...'


def test_does_sql_create_functions_works():
assert does_sql_create_functions('\nCREATE OR REPLACE FUNCTION boop()') is True
assert does_sql_create_functions('CREATE OR REPLACE \nFUNCTION boop()') is True
assert does_sql_create_functions('create or replace \nfunction boop()') is True
assert does_sql_create_functions('CREATE OR ZZZ REPLACE FUNCTION boop()') is False
assert does_sql_create_functions('') is False
assert does_sql_create_functions('CREATE TABLE blarg') is False


def test_get_all_create_function_sql_for_dataset_conforms_to_expectations():
for dataset in nycdb.dataset.datasets().keys():
# Make sure that all the SQL that creates functions *only* creates
# functions and doesn't e.g. create tables.
sql = get_all_create_function_sql_for_dataset(dataset)
assert 'CREATE TABLE' not in collapse_whitespace(sql).upper()


def test_get_all_create_function_sql_for_dataset_works():
sql = get_all_create_function_sql_for_dataset('hpd_registrations')
assert 'CREATE OR REPLACE FUNCTION get_corporate_owner_info_for_regid' in sql
26 changes: 26 additions & 0 deletions tests/test_wowutil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import psycopg2
from nycdb.dataset import Dataset

from .conftest import DATABASE_URL
from load_dataset import Config
import wowutil


def test_it_works(db, slack_outbox):
config = Config(database_url=DATABASE_URL, use_test_data=True)
for dataset_name in wowutil.WOW_YML['dependencies']:
ds = Dataset(dataset_name, args=config.nycdb_args)
ds.db_import()
wowutil.main(['build'], db_url=DATABASE_URL)

with psycopg2.connect(DATABASE_URL) as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM wow.wow_bldgs")
assert cur.fetchone()[0] > 0

# Make sure functions are defined, at least.
cur.execute("SET search_path TO wow, public")
cur.execute("SELECT wow.get_assoc_addrs_from_bbl('blah')")

assert slack_outbox[0] == 'Rebuilding Who Owns What tables...'
assert slack_outbox[1] == 'Finished rebuilding Who Owns What tables.'
98 changes: 98 additions & 0 deletions wowutil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""\
Perform operations on the Who Owns What database tables.
Usage:
wowutil.py build
Options:
-h --help Show this screen.
Environment variables:
DATABASE_URL The URL of the NYC-DB and WoW database.
"""

import sys
import os
from pathlib import Path
from typing import List
import docopt
import psycopg2
import yaml

from lib import slack
from lib.parse_created_tables import parse_created_tables_in_dir
from load_dataset import (
create_temp_schema_name,
create_and_enter_temporary_schema,
save_and_reapply_permissions,
ensure_schema_exists,
drop_tables_if_they_exist,
change_table_schemas,
run_sql_if_nonempty,
get_all_create_function_sql,
TableInfo
)


WOW_SCHEMA = 'wow'

WOW_DIR = Path('/who-owns-what')

WOW_SQL_DIR = Path(WOW_DIR / 'sql')

WOW_YML = yaml.load((WOW_DIR / 'who-owns-what.yml').read_text())

WOW_SCRIPTS: List[str] = WOW_YML['sql']


def run_wow_sql(conn):
with conn.cursor() as cur:
for filename in WOW_SCRIPTS:
print(f"Running {filename}...")
sql = (WOW_SQL_DIR / filename).read_text()
cur.execute(sql)
conn.commit()


def build(db_url: str):
slack.sendmsg('Rebuilding Who Owns What tables...')

cosmetic_dataset_name = 'wow'

tables = [
TableInfo(name=name, dataset=cosmetic_dataset_name)
for name in parse_created_tables_in_dir(WOW_SQL_DIR, WOW_SCRIPTS)
]

with psycopg2.connect(db_url) as conn:
temp_schema = create_temp_schema_name(cosmetic_dataset_name)
with create_and_enter_temporary_schema(conn, temp_schema):
run_wow_sql(conn)
ensure_schema_exists(conn, WOW_SCHEMA)
with save_and_reapply_permissions(conn, tables, WOW_SCHEMA):
drop_tables_if_they_exist(conn, tables, WOW_SCHEMA)
change_table_schemas(conn, tables, temp_schema, WOW_SCHEMA)

# The WoW tables are now ready, but the functions defined by WoW were
# in the temporary schema that just got destroyed. Let's re-run only
# the function-creating SQL in the WoW schema now.
#
# Note this means that any client which uses the functions will need
# to set their search_path to "{WOW_SCHEMA}, public" or else the function
# may not be found or might even crash!
print(f"Re-running CREATE FUNCTION statements in the {WOW_SCHEMA} schema...")
sql = get_all_create_function_sql(WOW_SQL_DIR, WOW_SCRIPTS)
run_sql_if_nonempty(conn, sql, initial_sql=f'SET search_path TO {WOW_SCHEMA}, public')

slack.sendmsg('Finished rebuilding Who Owns What tables.')


def main(argv: List[str], db_url: str):
args = docopt.docopt(__doc__, argv=argv)

if args['build']:
build(db_url)


if __name__ == '__main__':
main(argv=sys.argv[1:], db_url=os.environ['DATABASE_URL'])

0 comments on commit 1a572c3

Please sign in to comment.