diff --git a/webservices/tasks/__init__.py b/webservices/tasks/__init__.py index a2dee6ba1..4afe213c9 100755 --- a/webservices/tasks/__init__.py +++ b/webservices/tasks/__init__.py @@ -8,66 +8,71 @@ # Feature and dev are sharing the same RDS box so we only want dev to update schedule = {} -if env.app.get('space_name', 'unknown-space').lower() != 'feature': +if env.app.get("space_name", "unknown-space").lower() != "feature": schedule = { - 'refresh_materialized_views': { - 'task': 'webservices.tasks.refresh.refresh_materialized_views', - 'schedule': crontab(minute=0, hour=9), + # Reload DAILY_MODIFIED_STARTING_AO at 9pm(EST) except Sunday. + "reload_all_aos_daily_except_sunday": { + "task": "webservices.tasks.legal_docs.reload_all_aos_when_change", + "schedule": crontab(minute=0, hour=1, day_of_week="mon,tue,wed,thu,fri,sat"), }, - - 'reload_all_aos_daily_except_sunday': { - 'task': 'webservices.tasks.legal_docs.reload_all_aos_when_change', - 'schedule': crontab(minute=0, hour=1, day_of_week='mon,tue,wed,thu,fri,sat'), + # Reload All AOs every Sunday at 9pm(EST). + "reload_all_aos_every_sunday": { + "task": "webservices.tasks.legal_docs.reload_all_aos", + "schedule": crontab(minute=0, hour=1, day_of_week="sun"), }, - - 'reload_all_aos_every_sunday': { - 'task': 'webservices.tasks.legal_docs.reload_all_aos', - 'schedule': crontab(minute=0, hour=1, day_of_week='sun'), + # Reload RECENTLY_MODIFIED_CASES and RECENTLY_MODIFIED_STARTING_AO + # every 5 minutes during 6am-7pm(EST). + "refresh_legal_docs": { + "task": "webservices.tasks.legal_docs.refresh_most_recent_legal_doc", + "schedule": crontab(minute="*/5", hour="10-23"), }, - - 'refresh_legal_docs': { - 'task': 'webservices.tasks.legal_docs.refresh', - 'schedule': crontab(minute='*/5', hour='10-23'), + # Send modified legal case(during 6am-7pm EST) alerts to Slack every day at 7pm(EST) + "send_alert_legal_case": { + "task": "webservices.tasks.legal_docs.send_alert_most_recent_legal_case", + "schedule": crontab(minute=0, hour=23), }, - - 'backup_elasticsearch_every_sunday': { - 'task': 'webservices.tasks.legal_docs.create_es_backup', - 'schedule': crontab(minute=0, hour=4, day_of_week='sun'), + # Refresh materialized views everyday at 5am(EST). + "refresh_materialized_views": { + "task": "webservices.tasks.refresh.refresh_materialized_views", + "schedule": crontab(minute=0, hour=9), + }, + # Snapshot Elasticsearch at 12am(EST) in Sunday. + "backup_elasticsearch_every_sunday": { + "task": "webservices.tasks.legal_docs.create_es_backup", + "schedule": crontab(minute=0, hour=4, day_of_week="sun"), }, - } def redis_url(): """ Retrieve the URL needed to connect to a Redis instance, depending on environment. - When running in a cloud.gov environment, retrieve the uri credential for the 'aws-elasticache-redis' service. """ # Is the app running in a cloud.gov environment if env.space is not None: - redis_env = env.get_service(label='aws-elasticache-redis') - redis_url = redis_env.credentials.get('uri') + redis_env = env.get_service(label="aws-elasticache-redis") + redis_url = redis_env.credentials.get("uri") return redis_url - return env.get_credential('FEC_REDIS_URL', 'redis://localhost:6379/0') + return env.get_credential("FEC_REDIS_URL", "redis://localhost:6379/0") -app = celery.Celery('openfec') +app = celery.Celery("openfec") app.conf.update( broker_url=redis_url(), broker_use_ssl={ - 'ssl_cert_reqs': ssl.CERT_NONE, + "ssl_cert_reqs": ssl.CERT_NONE, }, redis_backend_use_ssl={ - 'ssl_cert_reqs': ssl.CERT_NONE, + "ssl_cert_reqs": ssl.CERT_NONE, }, imports=( - 'webservices.tasks.refresh', - 'webservices.tasks.download', - 'webservices.tasks.legal_docs', + "webservices.tasks.refresh", + "webservices.tasks.download", + "webservices.tasks.legal_docs", ), beat_schedule=schedule, broker_connection_timeout=30, # in seconds @@ -76,10 +81,10 @@ def redis_url(): ) app.conf.ONCE = { - 'backend': 'celery_once.backends.Redis', - 'settings': { - 'url': redis_url() + "?ssl=true", - 'default_timeout': 60 * 60 + "backend": "celery_once.backends.Redis", + "settings": { + "url": redis_url() + "?ssl=true", + "default_timeout": 60 * 60 } } diff --git a/webservices/tasks/legal_docs.py b/webservices/tasks/legal_docs.py index 3f092f79b..97fa9672d 100644 --- a/webservices/tasks/legal_docs.py +++ b/webservices/tasks/legal_docs.py @@ -38,17 +38,25 @@ ORDER BY case_serial """ +RECENTLY_MODIFIED_CASES_SEND_ALERT = """ + SELECT case_no, case_type, pg_date, published_flg + FROM fecmur.cases_with_parsed_case_serial_numbers_vw + WHERE pg_date >= NOW() - '13 hour'::INTERVAL + ORDER BY case_serial +""" + SLACK_BOTS = "#bots" -@app.task(once={'graceful': True}, base=QueueOnce) -def refresh(): +@app.task(once={"graceful": True}, base=QueueOnce) +def refresh_most_recent_legal_doc(): + # refresh most recently(within 8 hours) modified legal_doc. with db.engine.connect() as conn: - refresh_aos(conn) - refresh_cases(conn) + refresh_most_recent_aos(conn) + refresh_most_recent_cases(conn) -@app.task(once={'graceful': True}, base=QueueOnce) +@app.task(once={"graceful": True}, base=QueueOnce) def reload_all_aos_when_change(): """ Reload all AOs if there were any new or modified AOs found for the past 24 hour period @@ -56,65 +64,86 @@ def reload_all_aos_when_change(): with db.engine.connect() as conn: row = conn.execute(DAILY_MODIFIED_STARTING_AO).first() if row: - logger.info("AO found %s modified at %s", row["ao_no"], row["pg_date"]) - logger.info("Daily (%s) reload of all AOs starting", datetime.date.today().strftime("%A")) + logger.info(" AO found %s modified at %s", row["ao_no"], row["pg_date"]) + logger.info(" Daily (%s) reload of all AOs starting", datetime.date.today().strftime("%A")) load_advisory_opinions() - logger.info("Daily (%s) reload of all AOs completed", datetime.date.today().strftime("%A")) - slack_message = 'Daily reload of all AOs completed in {0} space'.format(get_app_name()) + logger.info(" Daily (%s) reload of all AOs completed", datetime.date.today().strftime("%A")) + slack_message = "Daily reload of all AOs completed in {0} space".format(get_app_name()) + utils.post_to_slack(slack_message, SLACK_BOTS) else: - logger.info("No daily (%s) modified AOs found", datetime.date.today().strftime("%A")) + logger.info(" No daily (%s) modified AOs found", datetime.date.today().strftime("%A")) slack_message = \ - 'No modified AOs found for the day - Reload of all AOs skipped in {0} space'.format(get_app_name()) + "No modified AOs found for the day - Reload of all AOs skipped in {0} space".format(get_app_name()) utils.post_to_slack(slack_message, SLACK_BOTS) -@app.task(once={'graceful': True}, base=QueueOnce) +@app.task(once={"graceful": True}, base=QueueOnce) def reload_all_aos(): - logger.info("Weekly (%s) reload of all AOs starting", datetime.date.today().strftime("%A")) + logger.info(" Weekly (%s) reload of all AOs starting", datetime.date.today().strftime("%A")) load_advisory_opinions() - logger.info("Weekly (%s) reload of all AOs completed", datetime.date.today().strftime("%A")) - slack_message = 'Weekly reload of all AOs completed in {0} space'.format(get_app_name()) + logger.info(" Weekly (%s) reload of all AOs completed", datetime.date.today().strftime("%A")) + slack_message = "Weekly reload of all AOs completed in {0} space".format(get_app_name()) utils.post_to_slack(slack_message, SLACK_BOTS) -@app.task(once={'graceful': True}, base=QueueOnce) +@app.task(once={"graceful": True}, base=QueueOnce) def create_es_backup(): try: - logger.info("Weekly (%s) elasticsearch backup starting", datetime.date.today().strftime("%A")) + logger.info(" Weekly (%s) elasticsearch backup starting", datetime.date.today().strftime("%A")) create_es_snapshot() - logger.info("Weekly (%s) elasticsearch backup completed", datetime.date.today().strftime("%A")) - slack_message = 'Weekly elasticsearch backup completed in {0} space'.format(get_app_name()) + logger.info(" Weekly (%s) elasticsearch backup completed", datetime.date.today().strftime("%A")) + slack_message = "Weekly elasticsearch backup completed in {0} space".format(get_app_name()) utils.post_to_slack(slack_message, SLACK_BOTS) except Exception as error: logger.exception(error) - slack_message = '*ERROR* elasticsearch backup failed for {0}. Check logs.'.format(get_app_name()) + slack_message = "*ERROR* elasticsearch backup failed for {0}. Check logs.".format(get_app_name()) utils.post_to_slack(slack_message, SLACK_BOTS) -def refresh_aos(conn): +def refresh_most_recent_aos(conn): row = conn.execute(RECENTLY_MODIFIED_STARTING_AO).first() if row: - logger.info("AO %s found modified at %s", row["ao_no"], row["pg_date"]) + logger.info(" AO %s found modified at %s", row["ao_no"], row["pg_date"]) load_advisory_opinions(row["ao_no"]) else: - logger.info("No modified AOs found") + logger.info(" No modified AOs found") -def refresh_cases(conn): - logger.info('Checking for modified cases') +def refresh_most_recent_cases(conn): + logger.info(" Checking for modified cases(MUR/AF/ADR)...") rs = conn.execute(RECENTLY_MODIFIED_CASES) if rs.returns_rows: load_count = 0 deleted_case_count = 0 for row in rs: - logger.info("%s %s found modified at %s", row["case_type"], row["case_no"], row["pg_date"]) + logger.info(" %s %s found modified at %s", row["case_type"], row["case_no"], row["pg_date"]) load_cases(row["case_type"], row["case_no"]) if row["published_flg"]: load_count += 1 - logger.info("Total of %d case(s) loaded...", load_count) + logger.info(" Total of %d case(s) loaded...", load_count) else: deleted_case_count += 1 - logger.info("Total of %d case(s) unpublished...", deleted_case_count) + logger.info(" Total of %d case(s) unpublished...", deleted_case_count) else: - logger.info("No modified cases found") + logger.info(" No modified cases found") + + +@app.task(once={"graceful": True}, base=QueueOnce) +def send_alert_most_recent_legal_case(): + # Send modified legal case(during 6am-7pm EST) alerts to Slack every day at 7pm(EST). + slack_message = "" + with db.engine.connect() as conn: + rs = conn.execute(RECENTLY_MODIFIED_CASES_SEND_ALERT) + if rs.returns_rows: + for row in rs: + if row["published_flg"]: + slack_message = slack_message + str(row["case_type"]) + " " + str(row["case_no"]) + " found published at " + str(row["pg_date"]) + slack_message = slack_message + "\n" + else: + slack_message = slack_message + str(row["case_type"]) + " " + str(row["case_no"]) + " found unpublished at " + str(row["pg_date"]) + slack_message = slack_message + "\n" + + if slack_message: + slack_message = slack_message + " in " + get_app_name() + utils.post_to_slack(slack_message, SLACK_BOTS) diff --git a/webservices/tasks/refresh.py b/webservices/tasks/refresh.py index 1344eb960..8f1d59d9a 100644 --- a/webservices/tasks/refresh.py +++ b/webservices/tasks/refresh.py @@ -15,14 +15,14 @@ def refresh_materialized_views(): """Update incremental aggregates, itemized schedules, materialized views, then slack a notification to the development team. """ - manage.logger.info('Starting nightly refresh...') + manage.logger.info('Starting nightly refresh materialized views...') try: manage.refresh_materialized() download.clear_bucket() - slack_message = '*Success* nightly updates for {0} completed'.format(get_app_name()) + slack_message = '*Success* nightly update materialized views for {0} completed'.format(get_app_name()) utils.post_to_slack(slack_message, '#bots') manage.logger.info(slack_message) except Exception as error: manage.logger.exception(error) - slack_message = '*ERROR* nightly update failed for {0}. Check logs.'.format(get_app_name()) + slack_message = '*ERROR* nightly update materialized views failed for {0}. Check logs.'.format(get_app_name()) utils.post_to_slack(slack_message, '#bots') diff --git a/webservices/tasks/utils.py b/webservices/tasks/utils.py index 08f412dda..88f9b04a3 100644 --- a/webservices/tasks/utils.py +++ b/webservices/tasks/utils.py @@ -47,4 +47,4 @@ def get_json_data(response): def get_app_name(): - return env.get_credential("APP_NAME") + return env.get_credential("APP_NAME") if env.get_credential("APP_NAME") is not None else "fec | api | local"