diff --git a/openlibrary/core/models.py b/openlibrary/core/models.py index 4f868576c89..cd3a13fea8c 100644 --- a/openlibrary/core/models.py +++ b/openlibrary/core/models.py @@ -2,6 +2,7 @@ """ from datetime import datetime, timedelta import logging +from openlibrary.core.vendors import get_amazon_metadata import web import json @@ -24,7 +25,6 @@ from openlibrary.core.imports import ImportItem from openlibrary.core.observations import Observations from openlibrary.core.ratings import Ratings -from openlibrary.core.vendors import create_edition_from_amazon_metadata from openlibrary.utils import extract_numeric_id_from_olid, dateutil from openlibrary.utils.isbn import to_isbn_13, isbn_13_to_isbn_10, canonical @@ -373,11 +373,16 @@ def get_ia_download_link(self, suffix): return f"https://archive.org/download/{self.ocaid}/{filename}" @classmethod - def from_isbn(cls, isbn: str) -> "Edition | None": # type: ignore[return] + def from_isbn(cls, isbn: str, high_priority: bool = False) -> "Edition | None": """ Attempts to fetch an edition by ISBN, or if no edition is found, then check the import_item table for a match, then as a last result, attempt to import from Amazon. + :param bool high_priority: If `True`, (1) any AMZ import requests will block + until AMZ has fetched data, and (2) the AMZ request will go to + the front of the queue. If `False`, the import will simply be + queued up if the item is not in the AMZ cache, and the affiliate + server will return a promise. :return: an open library edition for this ISBN or None. """ isbn = canonical(isbn) @@ -401,10 +406,23 @@ def from_isbn(cls, isbn: str) -> "Edition | None": # type: ignore[return] return web.ctx.site.get(matches[0]) # Attempt to fetch the book from the import_item table - if result := ImportItem.import_first_staged(identifiers=isbns): - return result + if edition := ImportItem.import_first_staged(identifiers=isbns): + return edition - # TODO: Final step - call affiliate server, with retry code migrated there. + # Finally, try to fetch the book data from Amazon + import. + # If `high_priority=True`, then the affiliate-server, which `get_amazon_metadata()` + # uses, will block + wait until the Product API responds and the result, if any, + # is staged in `import_item`. + try: + get_amazon_metadata( + id_=isbn10 or isbn13, id_type="isbn", high_priority=high_priority + ) + return ImportItem.import_first_staged(identifiers=isbns) + except requests.exceptions.ConnectionError: + logger.exception("Affiliate Server unreachable") + except requests.exceptions.HTTPError: + logger.exception(f"Affiliate Server: id {isbn10 or isbn13} not found") + return None def is_ia_scan(self): metadata = self.get_ia_meta_fields() diff --git a/openlibrary/core/vendors.py b/openlibrary/core/vendors.py index 41c4c4e563b..fe6e62f4e02 100644 --- a/openlibrary/core/vendors.py +++ b/openlibrary/core/vendors.py @@ -2,6 +2,8 @@ import logging import re import time + +from datetime import date from typing import Any, Literal import requests @@ -280,16 +282,21 @@ def get_amazon_metadata( id_: str, id_type: Literal['asin', 'isbn'] = 'isbn', resources: Any = None, - retries: int = 0, + high_priority: bool = False, ) -> dict | None: """Main interface to Amazon LookupItem API. Will cache results. :param str id_: The item id: isbn (10/13), or Amazon ASIN. :param str id_type: 'isbn' or 'asin'. + :param bool high_priority: Priority in the import queue. High priority + goes to the front of the queue. :return: A single book item's metadata, or None. """ return cached_get_amazon_metadata( - id_, id_type=id_type, resources=resources, retries=retries + id_, + id_type=id_type, + resources=resources, + high_priority=high_priority, ) @@ -307,8 +314,7 @@ def _get_amazon_metadata( id_: str, id_type: Literal['asin', 'isbn'] = 'isbn', resources: Any = None, - retries: int = 0, - sleep_sec: float = 1, + high_priority: bool = False, ) -> dict | None: """Uses the Amazon Product Advertising API ItemLookup operation to locate a specific book by identifier; either 'isbn' or 'asin'. @@ -318,12 +324,10 @@ def _get_amazon_metadata( :param str id_type: 'isbn' or 'asin'. :param Any resources: Used for AWSE Commerce Service lookup See https://webservices.amazon.com/paapi5/documentation/get-items.html - :param int retries: Number of times to query affiliate server before returning None - :param float sleep_sec: Delay time.sleep(sleep_sec) seconds before each retry + :param bool high_priority: Priority in the import queue. High priority + goes to the front of the queue. :return: A single book item's metadata, or None. """ - # TMP: This is causing a bunch of duplicate imports - return None if not affiliate_server_url: return None @@ -339,14 +343,15 @@ def _get_amazon_metadata( id_ = isbn try: - r = requests.get(f'http://{affiliate_server_url}/isbn/{id_}') + priority = "true" if high_priority else "false" + r = requests.get( + f'http://{affiliate_server_url}/isbn/{id_}?high_priority={priority}' + ) r.raise_for_status() - if hit := r.json().get('hit'): - return hit - if retries <= 1: + if data := r.json().get('hit'): + return data + else: return None - time.sleep(sleep_sec) # sleep before recursive call - return _get_amazon_metadata(id_, id_type, resources, retries - 1, sleep_sec) except requests.exceptions.ConnectionError: logger.exception("Affiliate Server unreachable") except requests.exceptions.HTTPError: @@ -415,7 +420,7 @@ def clean_amazon_metadata_for_load(metadata: dict) -> dict: def create_edition_from_amazon_metadata( - id_: str, id_type: Literal['asin', 'isbn'] = 'isbn', retries: int = 0 + id_: str, id_type: Literal['asin', 'isbn'] = 'isbn' ) -> str | None: """Fetches Amazon metadata by id from Amazon Product Advertising API, attempts to create OL edition from metadata, and returns the resulting edition key `/key/OL..M` @@ -426,7 +431,7 @@ def create_edition_from_amazon_metadata( :return: Edition key '/key/OL..M' or None """ - md = get_amazon_metadata(id_, id_type=id_type, retries=retries) + md = get_amazon_metadata(id_, id_type=id_type) if md and md.get('product_group') == 'Book': with accounts.RunAs('ImportBot'): diff --git a/openlibrary/plugins/books/code.py b/openlibrary/plugins/books/code.py index 82d29cdfea2..3560368c092 100644 --- a/openlibrary/plugins/books/code.py +++ b/openlibrary/plugins/books/code.py @@ -13,14 +13,40 @@ class books_json(delegate.page): + """ + Endpoint for mapping bib keys (e.g. ISBN, LCCN) to certain links associated + with Open Library editions, such as the thumbnail URL. + + - `bibkeys` is expected a comma separated string of ISBNs, LCCNs, etc. + - `'high_priority=true'` will attempt to import an edition from a supplied ISBN + if no matching edition is found. If not `high_priority`, then missed bib_keys + are queued for lookup on the affiliate-server, and any responses are `staged` + in `import_item`. + + Example call: + http://localhost:8080/api/books.json?bibkeys=059035342X,0312368615&high_priority=true + + Returns a JSONified dictionary of the form: + {"059035342X": { + "bib_key": "059035342X", + "info_url": "http://localhost:8080/books/OL43M/Harry_Potter_and_the_Sorcerer's_Stone", + "preview": "noview", + "preview_url": "https://archive.org/details/lccn_078073006991", + "thumbnail_url": "https://covers.openlibrary.org/b/id/21-S.jpg" + } + "0312368615": {...} + } + """ + path = "/api/books" @jsonapi def GET(self): - i = web.input(bibkeys='', callback=None, details="false") + i = web.input(bibkeys='', callback=None, details="false", high_priority="false") if web.ctx.path.endswith('.json'): i.format = 'json' - return dynlinks.dynlinks(i.bibkeys.split(","), i) + i.high_priority = i.get("high_priority") == "true" + return dynlinks.dynlinks(bib_keys=i.bibkeys.split(","), options=i) class read_singleget(delegate.page): diff --git a/openlibrary/plugins/books/dynlinks.py b/openlibrary/plugins/books/dynlinks.py index a9aadec0c0c..92c7d76b884 100644 --- a/openlibrary/plugins/books/dynlinks.py +++ b/openlibrary/plugins/books/dynlinks.py @@ -1,8 +1,10 @@ +from typing import Any import json import sys -from collections.abc import Hashable, Iterable, Mapping +from collections.abc import Generator, Hashable, Iterable, Mapping import web +from openlibrary.core.models import Edition from openlibrary.plugins.openlibrary.processors import urlsafe from openlibrary.core import helpers as h @@ -426,7 +428,7 @@ def process_doc_for_viewapi(bib_key, page): return d -def format_result(result, options): +def format_result(result: dict, options: web.storage) -> str: """Format result as js or json. >>> format_result({'x': 1}, {}) @@ -447,20 +449,80 @@ def format_result(result, options): return "var _OLBookInfo = %s;" % json_data -def dynlinks(bib_keys, options): +def is_isbn(bib_key: str) -> bool: + """Return True if the bib_key is ostensibly an ISBN (i.e. 10 or 13 characters).""" + return len(bib_key) in {10, 13} + + +def get_missed_isbn_bib_keys( + bib_keys: Iterable[str], found_records: dict +) -> Generator[str, None, None]: + """ + Return a Generator[str, None, None] with all ISBN bib_keys not in `found_records`. + """ + return ( + bib_key + for bib_key in bib_keys + if bib_key not in found_records and is_isbn(bib_key) + ) + + +def get_isbn_editiondict_map( + isbns: Iterable, high_priority: bool = False +) -> dict[str, Any]: + """ + Attempts to import items from their ISBN, returning a mapping of possibly + imported edition_dicts in the following form: + {isbn_string: edition_dict...}} + """ + # Get a mapping of ISBNs to new Editions (or `None`) + isbn_edition_map = { + isbn: Edition.from_isbn(isbn=isbn, high_priority=high_priority) + for isbn in isbns + } + + # Convert edictions to dicts, dropping ISBNs for which no edition was created. + return { + isbn: edition.dict() for isbn, edition in isbn_edition_map.items() if edition + } + + +def dynlinks(bib_keys: Iterable[str], options: web.storage) -> str: + """ + Return a JSONified dictionary of bib_keys (e.g. ISBN, LCCN) and select URLs + associated with the corresponding edition, if any. + + If a bib key is an ISBN, options.high_priority=True, and no edition is found, + an import is attempted with high priority; otherwise missed bib_keys are queued + for lookup via the affiliate-server and responses are `staged` in `import_item`. + + Example return value for a bib key of the ISBN "1452303886": + '{"1452303886": {"bib_key": "1452303886", "info_url": ' + '"http://localhost:8080/books/OL24630277M/Fires_of_Prophecy_The_Morcyth_Saga_Book_Two", ' + '"preview": "restricted", "preview_url": ' + '"https://archive.org/details/978-1-4523-0388-8"}}' + """ # for backward-compatibility if options.get("details", "").lower() == "true": options["jscmd"] = "details" try: - result = query_docs(bib_keys) - result = process_result(result, options.get('jscmd')) + edition_dicts = query_docs(bib_keys) + # For any ISBN bib_keys without hits, attempt to import+use immediately if + # `high_priority`. Otherwise, queue them for lookup via the AMZ Products + # API and process whatever editions were found in existing data. + if missed_isbns := get_missed_isbn_bib_keys(bib_keys, edition_dicts): + new_editions = get_isbn_editiondict_map( + isbns=missed_isbns, high_priority=options.get("high_priority") + ) + edition_dicts.update(new_editions) + edition_dicts = process_result(edition_dicts, options.get('jscmd')) except: print("Error in processing Books API", file=sys.stderr) register_exception() - result = {} - return format_result(result, options) + edition_dicts = {} + return format_result(edition_dicts, options) if __name__ == "__main__": diff --git a/openlibrary/plugins/openlibrary/code.py b/openlibrary/plugins/openlibrary/code.py index daffbfdf829..456a41b3b77 100644 --- a/openlibrary/plugins/openlibrary/code.py +++ b/openlibrary/plugins/openlibrary/code.py @@ -2,6 +2,7 @@ Open Library Plugin. """ +from urllib.parse import parse_qs, urlparse, urlencode, urlunparse import requests import web import json @@ -461,10 +462,33 @@ def GET(self): return web.ok('OK') +def remove_high_priority(query: str) -> str: + """ + Remove `high_priority=true` and `high_priority=false` from query parameters, + as the API expects to pass URL parameters through to another query, and + these may interfere with that query. + + >>> remove_high_priority('high_priority=true&v=1') + 'v=1' + """ + query_params = parse_qs(query) + query_params.pop("high_priority", None) + new_query = urlencode(query_params, doseq=True) + return new_query + + class isbn_lookup(delegate.page): path = r'/(?:isbn|ISBN)/([0-9xX-]+)' def GET(self, isbn): + input = web.input(high_priority=False) + + high_priority = input.get("high_priority") == "true" + if "high_priority" in web.ctx.env.get('QUERY_STRING'): + web.ctx.env['QUERY_STRING'] = remove_high_priority( + web.ctx.env.get('QUERY_STRING') + ) + # Preserve the url type (e.g. `.json`) and query params ext = '' if web.ctx.encoding and web.ctx.path.endswith('.' + web.ctx.encoding): @@ -473,7 +497,7 @@ def GET(self, isbn): ext += '?' + web.ctx.env['QUERY_STRING'] try: - if ed := Edition.from_isbn(isbn): + if ed := Edition.from_isbn(isbn=isbn, high_priority=high_priority): return web.found(ed.key + ext) except Exception as e: logger.error(e) diff --git a/openlibrary/plugins/upstream/borrow.py b/openlibrary/plugins/upstream/borrow.py index 07f429fbc98..8557eceb0fe 100644 --- a/openlibrary/plugins/upstream/borrow.py +++ b/openlibrary/plugins/upstream/borrow.py @@ -17,6 +17,7 @@ from infogami.utils.view import public, render_template, add_flash_message from infogami.infobase.utils import parse_datetime +from openlibrary.core import models from openlibrary.core import stats from openlibrary.core import lending from openlibrary.core import vendors diff --git a/openlibrary/tests/core/test_vendors.py b/openlibrary/tests/core/test_vendors.py index 1804fd3f22d..37b285383d2 100644 --- a/openlibrary/tests/core/test_vendors.py +++ b/openlibrary/tests/core/test_vendors.py @@ -1,5 +1,8 @@ +from unittest.mock import patch + import pytest from openlibrary.core.vendors import ( + get_amazon_metadata, split_amazon_title, clean_amazon_metadata_for_load, betterworldbooks_fmt, @@ -192,3 +195,64 @@ def test_betterworldbooks_fmt(): # Test cases to add: # Multiple authors + + +def test_get_amazon_metadata() -> None: + """ + Mock a reply from the Amazon Products API so we can do a basic test for + get_amazon_metadata() and cached_get_amazon_metadata(). + """ + + class MockRequests: + def get(self): + pass + + def raise_for_status(self): + return True + + def json(self): + return mock_response + + mock_response = { + 'status': 'success', + 'hit': { + 'url': 'https://www.amazon.com/dp/059035342X/?tag=internetarchi-20', + 'source_records': ['amazon:059035342X'], + 'isbn_10': ['059035342X'], + 'isbn_13': ['9780590353427'], + 'price': '$5.10', + 'price_amt': 509, + 'title': "Harry Potter and the Sorcerer's Stone", + 'cover': 'https://m.media-amazon.com/images/I/51Wbz5GypgL._SL500_.jpg', + 'authors': [{'name': 'Rowling, J.K.'}, {'name': 'GrandPr_, Mary'}], + 'publishers': ['Scholastic'], + 'number_of_pages': 309, + 'edition_num': '1', + 'publish_date': 'Sep 02, 1998', + 'product_group': 'Book', + 'physical_format': 'paperback', + }, + } + expected = { + 'url': 'https://www.amazon.com/dp/059035342X/?tag=internetarchi-20', + 'source_records': ['amazon:059035342X'], + 'isbn_10': ['059035342X'], + 'isbn_13': ['9780590353427'], + 'price': '$5.10', + 'price_amt': 509, + 'title': "Harry Potter and the Sorcerer's Stone", + 'cover': 'https://m.media-amazon.com/images/I/51Wbz5GypgL._SL500_.jpg', + 'authors': [{'name': 'Rowling, J.K.'}, {'name': 'GrandPr_, Mary'}], + 'publishers': ['Scholastic'], + 'number_of_pages': 309, + 'edition_num': '1', + 'publish_date': 'Sep 02, 1998', + 'product_group': 'Book', + 'physical_format': 'paperback', + } + isbn = "059035342X" + with patch("requests.get", return_value=MockRequests()), patch( + "openlibrary.core.vendors.affiliate_server_url", new=True + ): + got = get_amazon_metadata(id_=isbn, id_type="isbn") + assert got == expected diff --git a/scripts/affiliate_server.py b/scripts/affiliate_server.py index c1c8ea71554..e615ff492be 100644 --- a/scripts/affiliate_server.py +++ b/scripts/affiliate_server.py @@ -33,7 +33,6 @@ products = web.amazon_api.get_products(["195302114X", "0312368615"], serialize=True) ``` """ - import itertools import json import logging @@ -42,7 +41,11 @@ import sys import threading import time -from datetime import date + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Final import web @@ -52,7 +55,7 @@ from infogami import config from openlibrary.config import load_config as openlibrary_load_config from openlibrary.core import cache, stats -from openlibrary.core.imports import Batch +from openlibrary.core.imports import Batch, ImportItem from openlibrary.core.vendors import AmazonAPI, clean_amazon_metadata_for_load from openlibrary.utils.dateutil import WEEK_SECS from openlibrary.utils.isbn import ( @@ -81,22 +84,23 @@ 'publish_date': 'publish_date', 'number_of_pages': 'number_of_pages', } +RETRIES: Final = 5 -batch_name = "" batch: Batch | None = None -web.amazon_queue = queue.Queue() # a thread-safe multi-producer, multi-consumer queue +web.amazon_queue = ( + queue.PriorityQueue() +) # a thread-safe multi-producer, multi-consumer queue web.amazon_lookup_thread = None def get_current_amazon_batch() -> Batch: """ - At startup or when the month changes, create a new openlibrary.core.imports.Batch() + At startup, get the Amazon openlibrary.core.imports.Batch() for global use. """ - global batch_name, batch - if batch_name != (new_batch_name := f"amz-{date.today():%Y%m}"): - batch_name = new_batch_name - batch = Batch.find(batch_name) or Batch.new(batch_name) + global batch + if not batch: + batch = Batch.find("amz") or Batch.new("amz") assert batch return batch @@ -201,13 +205,17 @@ def process_amazon_batch(isbn_10s: list[str]) -> None: return books = [clean_amazon_metadata_for_load(product) for product in products] - if books and (pending_books := get_pending_books(books)): + + if books: stats.increment( "ol.affiliate.amazon.total_items_batched_for_import", - n=len(pending_books), + n=len(books), ) get_current_amazon_batch().add_items( - [{'ia_id': b['source_records'][0], 'data': b} for b in pending_books] + [ + {'ia_id': b['source_records'][0], 'status': 'staged', 'data': b} + for b in books + ] ) @@ -230,7 +238,7 @@ def amazon_lookup(site, stats_client, logger) -> None: while len(isbn_10s) < API_MAX_ITEMS_PER_CALL and seconds_remaining(start_time): try: # queue.get() will block (sleep) until successful or it times out isbn_10s.add( - web.amazon_queue.get(timeout=seconds_remaining(start_time)) + web.amazon_queue.get(timeout=seconds_remaining(start_time)).isbn ) except queue.Empty: pass @@ -282,6 +290,45 @@ def GET(self) -> str: return json.dumps({"Cleared": "True", "qsize": qsize}) +class Priority(Enum): + """ + Priority for the `PrioritizedISBN` class. + + `queue.PriorityQueue` has a lowest-value-is-highest-priority system, but + setting `PrioritizedISBN.priority` to 0 can make it look as if priority is + disabled. Using an `Enum` can help with that. + """ + + HIGH = 0 + LOW = 1 + + def __lt__(self, other): + if isinstance(other, Priority): + return self.value < other.value + return NotImplemented + + +@dataclass(order=True, slots=True) +class PrioritizedISBN: + """ + Represent an ISBN's priority in the queue. Sorting is based on the `priority` + attribute, then the `timestamp` to solve tie breaks within a specific priority, + with priority going to whatever `min([items])` would return. + For more, see https://docs.python.org/3/library/queue.html#queue.PriorityQueue. + + Therefore, priority 0, which is equivalent to `Priority.HIGH`, is the highest + priority. + + This exists so certain ISBNs can go to the front of the queue for faster + processing as their look-ups are time sensitive and should return look up data + to the caller (e.g. interactive API usage through `/isbn`). + """ + + isbn: str = field(compare=False) + priority: Priority = field(default=Priority.LOW) + timestamp: datetime = field(default_factory=datetime.now) + + class Submit: @classmethod def unpack_isbn(cls, isbn) -> tuple[str, str]: @@ -296,8 +343,16 @@ def unpack_isbn(cls, isbn) -> tuple[str, str]: def GET(self, isbn: str) -> str: """ - If isbn is in memcache then return the `hit`. If not then queue the isbn to be - looked up and return the equivalent of a promise as `submitted` + If `isbn` is in memcache, then return the `hit` (which is marshaled into + a format appropriate for import on Open Library if `?high_priority=true`). + + If no hit, then queue the isbn for look up and either attempt to return + a promise as `submitted`, or if `?high_priority=true`, return marshalled data + from the cache. + + `Priority.HIGH` is set when `?high_priority=true` and is the highest priority. + It is used when the caller is waiting for a response with the AMZ data, if + available. See `PrioritizedISBN` for more on prioritization. """ # cache could be None if reached before initialized (mypy) if not web.amazon_api: @@ -309,13 +364,25 @@ def GET(self, isbn: str) -> str: {"error": "rejected_isbn", "isbn10": isbn10, "isbn13": isbn13} ) + input = web.input(high_priority=False) + priority = ( + Priority.HIGH if input.get("high_priority") == "true" else Priority.LOW + ) + # Cache lookup by isbn13. If there's a hit return the product to the caller if product := cache.memcache_cache.get(f'amazon_product_{isbn13}'): - return json.dumps({"status": "success", "hit": product}) + return json.dumps( + { + "status": "success", + "hit": product, + } + ) - # Cache misses will be submitted to Amazon as ASINs (isbn10) + # Cache misses will be submitted to Amazon as ASINs (isbn10) and the response + # will be `staged` for import. if isbn10 not in web.amazon_queue.queue: - web.amazon_queue.put_nowait(isbn10) + isbn10_queue_item = PrioritizedISBN(isbn=isbn10, priority=priority) + web.amazon_queue.put_nowait(isbn10_queue_item) # Give us a snapshot over time of how many new isbns are currently queued stats.put( @@ -323,7 +390,27 @@ def GET(self, isbn: str) -> str: web.amazon_queue.qsize(), rate=0.2, ) - return json.dumps({"status": "submitted", "queue": web.amazon_queue.qsize()}) + + # Check the cache a few times for product data to return to the client, + # or otherwise return. + if priority == Priority.HIGH: + for _ in range(RETRIES): + time.sleep(1) + if product := cache.memcache_cache.get(f'amazon_product_{isbn13}'): + cleaned_metadata = clean_amazon_metadata_for_load(product) + source, pid = cleaned_metadata['source_records'][0].split(":") + if ImportItem.find_staged_or_pending( + identifiers=[pid], sources=[source] + ): + return json.dumps({"status": "success", "hit": product}) + + stats.increment("ol.affiliate.amazon.total_items_not_found") + return json.dumps({"status": "not found"}) + + else: + return json.dumps( + {"status": "submitted", "queue": web.amazon_queue.qsize()} + ) def load_config(configfile):