Skip to content

Commit

Permalink
Release December updates
Browse files Browse the repository at this point in the history
Includes:
#653
#654
#656
#657
#658
#662
#666
#665
#668
#667
#678
and December dependency updates
  • Loading branch information
Mr0grog committed Dec 2, 2020
2 parents 8e1b69b + ae6f53d commit 89d8819
Show file tree
Hide file tree
Showing 25 changed files with 4,016 additions and 173 deletions.
4 changes: 2 additions & 2 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ pytest ~=6.1.2
vcrpy ~=4.1.1
doctr ~=1.8.0
ipython ~=7.19.0
matplotlib ~=3.3.2
matplotlib ~=3.3.3
numpydoc ~=1.1.0
requests-mock ~=1.8.0
sphinx ~=3.2.1
sphinx ~=3.3.1
sphinx_rtd_theme ~=0.5.0
8 changes: 4 additions & 4 deletions docs/source/db_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ API Documentation

Client
Client.from_env
Client.list_pages
Client.get_pages
Client.get_page
Client.list_versions
Client.get_versions
Client.get_version
Client.add_version
Client.add_versions
Client.list_changes
Client.get_changes
Client.get_change
Client.list_annotations
Client.get_annotations
Client.get_annotation
Client.add_annotation
10 changes: 5 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ docopt ~=0.6.2
git+https://github.com/anastasia/htmldiffer@develop
git+https://github.com/danielballan/htmltreediff@customize
html5-parser ~=0.4.9 --no-binary lxml
lxml ~=4.6.1
lxml ~=4.6.2
pycurl ~=7.43
PyPDF2 ~=1.26.0
sentry-sdk ~=0.19.1
requests ~=2.24.0
sentry-sdk ~=0.19.4
requests ~=2.25.0
toolz ~=0.11.1
tornado ~=6.1
tqdm ~=4.51.0
wayback ~=0.3.0a1
tqdm ~=4.54.0
wayback ~=0.3.0a3
139 changes: 98 additions & 41 deletions web_monitoring/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
"""

from collections import defaultdict
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from web_monitoring.utils import detect_encoding
import dateutil.parser
from docopt import docopt
from itertools import islice
import json
import logging
import os
Expand Down Expand Up @@ -146,6 +148,9 @@
'application/x-pdf',
))

# Identifies a bare media type (that is, one without parameters)
MEDIA_TYPE_EXPRESSION = re.compile(r'^\w+/\w[\w+_\-.]+$')


# These functions lump together library code into monolithic operations for the
# CLI. They also print. To access this functionality programmatically, it is
Expand Down Expand Up @@ -243,7 +248,7 @@ class WaybackRecordsWorker(threading.Thread):

def __init__(self, records, results_queue, maintainers, tags, cancel,
failure_queue=None, session_options=None, adapter=None,
unplaybackable=None):
unplaybackable=None, version_cache=None):
super().__init__()
self.summary = self.create_summary()
self.results_queue = results_queue
Expand All @@ -253,6 +258,7 @@ def __init__(self, records, results_queue, maintainers, tags, cancel,
self.maintainers = maintainers
self.tags = tags
self.unplaybackable = unplaybackable
self.version_cache = version_cache or set()
self.adapter = adapter
session_options = session_options or dict(retries=3, backoff=2,
timeout=(30.5, 2))
Expand All @@ -277,32 +283,42 @@ def run(self):
except StopIteration:
break

self.handle_record(record, retry_connection_failures=True)
self.handle_record(record)

# Only close the client if it's using an adapter we created, instead of
# one some other piece of code owns.
if not self.adapter:
self.wayback.close()
return self.summary

def handle_record(self, record, retry_connection_failures=False):
def handle_record(self, record):
"""
Handle a single CDX record.
"""
# Check whether we already have this memento and bail out.
if _version_cache_key(record.timestamp, record.url) in self.version_cache:
self.summary['already_known'] += 1
return
# Check for whether we already know this can't be played and bail out.
if self.unplaybackable is not None and record.raw_url in self.unplaybackable:
self.summary['playback'] += 1
return

try:
version = self.process_record(record, retry_connection_failures=True)
version = self.process_record(record)
self.results_queue.put(version)
self.summary['success'] += 1
except MementoPlaybackError as error:
self.summary['playback'] += 1
if self.unplaybackable is not None:
self.unplaybackable[record.raw_url] = datetime.utcnow()
logger.info(f' {error}')
# Playback errors are not unusual or exceptional for us, so log
# only at debug level. The Wayback Machine marks some mementos as
# unplaybackable when there are many of them in a short timeframe
# in order to increase cache efficiency (the assumption they make
# here is that the mementos are likely the same). Since we are
# looking at highly monitored, public URLs, we hit this case a lot.
logger.debug(f' {error}')
except requests.exceptions.HTTPError as error:
if error.response.status_code == 404:
logger.info(f' Missing memento: {record.raw_url}')
Expand Down Expand Up @@ -335,7 +351,7 @@ def handle_record(self, record, retry_connection_failures=False):
else:
self.summary['unknown'] += 1

def process_record(self, record, retry_connection_failures=False):
def process_record(self, record):
"""
Load the actual Wayback memento for a CDX record and transform it to
a Web Monitoring DB import record.
Expand Down Expand Up @@ -372,9 +388,10 @@ def format_memento(self, memento, cdx_record, maintainers, tags):

title = ''
if media_type in HTML_MEDIA_TYPES:
title = utils.extract_title(memento.content, memento.encoding or 'utf-8')
encoding = detect_encoding(memento.content, memento.headers)
title = utils.extract_title(memento.content, encoding)
elif media_type in PDF_MEDIA_TYPES or memento.content.startswith(b'%PDF-'):
title = utils.extract_pdf_title(memento.content)
title = utils.extract_pdf_title(memento.content) or title

return dict(
# Page-level info
Expand All @@ -387,7 +404,6 @@ def format_memento(self, memento, cdx_record, maintainers, tags):
capture_time=iso_date,
uri=cdx_record.raw_url,
media_type=media_type or None,
media_type_parameters=media_type_parameters or None,
version_hash=utils.hash_content(memento.content),
source_type='internet_archive',
source_metadata=metadata,
Expand All @@ -398,6 +414,13 @@ def get_memento_media(self, memento):
"""Extract media type and media type parameters from a memento."""
media, *parameters = memento.headers.get('Content-Type', '').split(';')

# Clean up media type
media = media.strip().lower()
if not MEDIA_TYPE_EXPRESSION.match(media):
original = memento.history[0] if memento.history else memento
logger.info('Unknown media type "%s" for "%s"', media, original.memento_url)
media = ''

# Clean up whitespace, remove empty parameters, etc.
clean_parameters = (param.strip() for param in parameters)
parameters = [param for param in clean_parameters if param]
Expand All @@ -411,8 +434,8 @@ def create_summary(cls):
Create a dictionary that summarizes the results of processing all the
CDX records on a queue.
"""
return {'total': 0, 'success': 0, 'playback': 0, 'missing': 0,
'unknown': 0}
return {'total': 0, 'success': 0, 'already_known': 0, 'playback': 0,
'missing': 0, 'unknown': 0}

@classmethod
def summarize(cls, workers, initial=None):
Expand Down Expand Up @@ -522,10 +545,15 @@ def parallel_with_retries(cls, count, summary, records, results_queue, *args, tr
retry_queue = None
workers = []
for index, try_setting in enumerate(tries):
if retry_queue and not retry_queue.empty():
print(f'\nRetrying about {retry_queue.qsize()} failed records...', flush=True)
retry_queue.end()
records = retry_queue
if retry_queue:
if retry_queue.empty():
# We can only get here if we are on retry run, but there's
# nothing to retry, so we may as well stop.
break
else:
print(f'\nRetrying about {retry_queue.qsize()} failed records...', flush=True)
retry_queue.end()
records = retry_queue

if index == total_tries - 1:
retry_queue = None
Expand All @@ -537,16 +565,39 @@ def parallel_with_retries(cls, count, summary, records, results_queue, *args, tr
results_queue,
*args,
failure_queue=retry_queue,
session_options=try_setting,
**kwargs))

summary.update(cls.summarize(workers, summary))
results_queue.end()


def _version_cache_key(time, url):
utc_time = time.astimezone(timezone.utc)
return f'{utc_time.strftime("%Y%m%d%H%M%S")}|{url}'


def _load_known_versions(client, start_date, end_date):
print('Pre-checking known versions...', flush=True)

versions = client.get_versions(start_date=start_date,
end_date=end_date,
different=False, # Get *every* record
sort=['capture_time:desc'],
chunk_size=1000)
# Limit to latest 500,000 results for sanity/time/memory
limited_versions = islice(versions, 500_000)
cache = set(_version_cache_key(v["capture_time"], v["capture_url"])
for v in limited_versions)
logger.debug(f' Found {len(cache)} known versions')
return cache


def import_ia_db_urls(*, from_date=None, to_date=None, maintainers=None,
tags=None, skip_unchanged='resolved-response',
url_pattern=None, worker_count=0,
unplaybackable_path=None, dry_run=False):
unplaybackable_path=None, dry_run=False,
precheck_versions=False):
client = db.Client.from_env()
logger.info('Loading known pages from web-monitoring-db instance...')
urls, version_filter = _get_db_page_url_info(client, url_pattern)
Expand All @@ -558,6 +609,12 @@ def import_ia_db_urls(*, from_date=None, to_date=None, maintainers=None,
logger.info(f'Found {len(urls)} CDX-queryable URLs')
logger.debug('\n '.join(urls))

version_cache = None
if precheck_versions:
version_cache = _load_known_versions(client,
start_date=from_date,
end_date=to_date)

return import_ia_urls(
urls=urls,
from_date=from_date,
Expand All @@ -570,7 +627,8 @@ def import_ia_db_urls(*, from_date=None, to_date=None, maintainers=None,
create_pages=False,
unplaybackable_path=unplaybackable_path,
db_client=client,
dry_run=dry_run)
dry_run=dry_run,
version_cache=version_cache)


# TODO: this function probably be split apart so `dry_run` doesn't need to
Expand All @@ -580,9 +638,10 @@ def import_ia_urls(urls, *, from_date=None, to_date=None,
skip_unchanged='resolved-response',
version_filter=None, worker_count=0,
create_pages=True, unplaybackable_path=None,
db_client=None, dry_run=False):
if not all(_is_valid(url) for url in urls):
raise ValueError("Invalid URL provided")
db_client=None, dry_run=False, version_cache=None):
for url in urls:
if not _is_valid(url):
raise ValueError(f'Invalid URL: "{url}"')

worker_count = worker_count if worker_count > 0 else PARALLEL_REQUESTS
unplaybackable = load_unplaybackable_mementos(unplaybackable_path)
Expand Down Expand Up @@ -614,8 +673,11 @@ def import_ia_urls(urls, *, from_date=None, to_date=None,
tags,
stop_event,
unplaybackable=unplaybackable,
version_cache=version_cache,
# Use the default retries on the first round, then no retries with
# *really* long timeouts on the second, final round.
tries=(None,
dict(retries=3, backoff=4, timeout=(30.5, 2)))))
dict(retries=0, backoff=1, timeout=(120, 60)))))
memento_thread.start()

uploadable_versions = versions_queue
Expand All @@ -631,16 +693,16 @@ def import_ia_urls(urls, *, from_date=None, to_date=None,
memento_thread.join()

print('\nLoaded {total} CDX records:\n'
' {success:6} successes ({success_pct:.2f}%),\n'
' {playback:6} could not be played back ({playback_pct:.2f}%),\n'
' {missing:6} had no actual memento ({missing_pct:.2f}%),\n'
' {unknown:6} unknown errors ({unknown_pct:.2f}%).'.format(
' {success:6} successes ({success_pct:.2f}%)\n'
' {already_known:6} skipped - already in DB ({already_known_pct:.2f}%)\n'
' {playback:6} could not be played back ({playback_pct:.2f}%)\n'
' {missing:6} had no actual memento ({missing_pct:.2f}%)\n'
' {unknown:6} unknown errors ({unknown_pct:.2f}%)'.format(
**summary))

uploader.join()

if not dry_run:
print('Saving list of non-playbackable URLs...')
save_unplaybackable_mementos(unplaybackable_path, unplaybackable)

if summary['success'] == 0:
Expand Down Expand Up @@ -725,6 +787,8 @@ def save_unplaybackable_mementos(path, mementos, expiration=7 * 24 * 60 * 60):
if path is None:
return

print('Saving list of non-playbackable URLs...')

threshold = datetime.utcnow() - timedelta(seconds=expiration)
urls = list(mementos.keys())
for url in urls:
Expand Down Expand Up @@ -762,7 +826,9 @@ def _get_db_page_url_info(client, url_pattern=None):
domains = defaultdict(lambda: {'query_domain': False, 'urls': []})

domains_without_url_keys = set()
for page in _list_all_db_pages(client, url_pattern):
pages = client.get_pages(url=url_pattern, active=True,
sort=['created_at:asc'], chunk_size=1000)
for page in pages:
domain = HOST_EXPRESSION.match(page['url']).group(1)
data = domains[domain]
if not data['query_domain']:
Expand Down Expand Up @@ -823,18 +889,6 @@ def _is_page(version):
splitext(urlparse(version.url).path)[1] not in SUBRESOURCE_EXTENSIONS)


# TODO: this should probably be a method on db.Client, but db.Client could also
# do well to transform the `links` into callables, e.g:
# more_pages = pages['links']['next']()
def _list_all_db_pages(client, url_pattern=None):
chunk = 1
while chunk > 0:
pages = client.list_pages(sort=['created_at:asc'], chunk_size=1000,
chunk=chunk, url=url_pattern, active=True)
yield from pages['data']
chunk = pages['links']['next'] and (chunk + 1) or -1


def _parse_date_argument(date_string):
"""Parse a CLI argument that should represent a date into a datetime"""
if not date_string:
Expand Down Expand Up @@ -918,6 +972,8 @@ def main():
list of unplaybackable mementos will be written
to this file. If it exists before importing,
memento URLs listed in it will be skipped.
--precheck Check the list of versions in web-monitoring-db
and avoid re-importing duplicates.
--dry-run Don't upload data to web-monitoring-db.
"""
arguments = docopt(doc, version='0.0.1')
Expand Down Expand Up @@ -949,7 +1005,8 @@ def main():
url_pattern=arguments.get('--pattern'),
worker_count=int(arguments.get('--parallel')),
unplaybackable_path=arguments.get('--unplaybackable'),
dry_run=arguments.get('--dry-run'))
dry_run=arguments.get('--dry-run'),
precheck_versions=arguments.get('--precheck'))


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 89d8819

Please sign in to comment.