www.fgks.org   »   [go: up one dir, main page]

Skip to content

Commit

Permalink
Allow /isbn to import Amazon-specific B* ASINS
Browse files Browse the repository at this point in the history
The Amazon Products API used by the affiliate server can return product
information for Amazon-specific ASINs that start with `B`. This commit
makes changes sufficient to allow `/isbn` to support "ISBNs" (i.e.
Amazon-specific ASINs) that start with `B`.

The high level description of how this works is that the validation has
been modified all through the pipline to allow `B` ASINs, from `/isbn`
on through to the validation for importing items from Amazon.
  • Loading branch information
scottbarnes committed Mar 13, 2024
1 parent 14e3ef7 commit f2f509f
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 48 deletions.
9 changes: 9 additions & 0 deletions openlibrary/catalog/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,15 @@ def needs_isbn_and_lacks_one(rec: dict) -> bool:
"""

def needs_isbn(rec: dict) -> bool:
# Exception for Amazon-specific ASINs, which often accompany ebooks
if any(
name == "amazon" and identifier.startswith("B")
for record in rec.get("source_records", [])
if record and ":" in record
for name, identifier in [record.split(":", 1)]
):
return False

return any(
record.split(":")[0] in BOOKSELLERS_WITH_ADDITIONAL_VALIDATION
for record in rec.get('source_records', [])
Expand Down
48 changes: 34 additions & 14 deletions openlibrary/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,39 +385,59 @@ def from_isbn(cls, isbn: str, high_priority: bool = False) -> "Edition | None":
server will return a promise.
:return: an open library edition for this ISBN or None.
"""
asin = isbn if isbn.startswith("B") else ""
isbn = canonical(isbn)

if len(isbn) not in [10, 13]:
if len(isbn) not in [10, 13] and len(asin) not in [10, 13]:
return None # consider raising ValueError

isbn13 = to_isbn_13(isbn)
if isbn13 is None:
if isbn13 is None and not isbn:
return None # consider raising ValueError

isbn10 = isbn_13_to_isbn_10(isbn13)
isbns = [isbn13, isbn10] if isbn10 is not None else [isbn13]
book_ids: list[str] = []
if isbn10 is not None:
book_ids.extend(
[isbn10, isbn13]
) if isbn13 is not None else book_ids.append(isbn10)
elif asin is not None:
book_ids.append(asin)
else:
book_ids.append(isbn13)

# Attempt to fetch book from OL
for isbn in isbns:
if isbn:
matches = web.ctx.site.things(
{"type": "/type/edition", 'isbn_%s' % len(isbn): isbn}
)
if matches:
for book_id in book_ids:
if book_id == asin:
if matches := web.ctx.site.things(
{"type": "/type/edition", 'identifiers': {'amazon': asin}}
):
return web.ctx.site.get(matches[0])
elif book_id and (
matches := web.ctx.site.things(
{"type": "/type/edition", 'isbn_%s' % len(book_id): book_id}
)
):
return web.ctx.site.get(matches[0])

# Attempt to fetch the book from the import_item table
if edition := ImportItem.import_first_staged(identifiers=isbns):
if edition := ImportItem.import_first_staged(identifiers=book_ids):
return edition

# Finally, try to fetch the book data from Amazon + import.
# If `high_priority=True`, then the affiliate-server, which `get_amazon_metadata()`
# uses, will block + wait until the Product API responds and the result, if any,
# is staged in `import_item`.
try:
get_amazon_metadata(
id_=isbn10 or isbn13, id_type="isbn", high_priority=high_priority
)
return ImportItem.import_first_staged(identifiers=isbns)
if asin:
get_amazon_metadata(
id_=asin, id_type="asin", high_priority=high_priority
)
else:
get_amazon_metadata(
id_=isbn10 or isbn13, id_type="isbn", high_priority=high_priority
)
return ImportItem.import_first_staged(identifiers=book_ids)
except requests.exceptions.ConnectionError:
logger.exception("Affiliate Server unreachable")
except requests.exceptions.HTTPError:
Expand Down
2 changes: 1 addition & 1 deletion openlibrary/plugins/openlibrary/code.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ def remove_high_priority(query: str) -> str:


class isbn_lookup(delegate.page):
path = r'/(?:isbn|ISBN)/([0-9xX-]+)'
path = r'/(?:isbn|ISBN)/([bB]?[0-9a-zA-Z-]+)'

def GET(self, isbn):
input = web.input(high_priority=False)
Expand Down
108 changes: 75 additions & 33 deletions scripts/affiliate_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Final
from typing import Any, Final

import web

Expand All @@ -68,7 +68,7 @@

# fmt: off
urls = (
'/isbn/([0-9X-]+)', 'Submit',
'/isbn/([bB]?[0-9a-zA-Z-]+)', 'Submit',
'/status', 'Status',
'/clear', 'Clear',
)
Expand Down Expand Up @@ -171,30 +171,52 @@ def get_pending_books(books):
return pending_books


def process_amazon_batch(isbn_10s: list[str]) -> None:
def make_cache_key(product: dict[str, Any]) -> str:
"""
Takes a `product` returned from `vendor.get_products()` and returns a cache key to
identify the product. For a given product, the cache key will be either (1) its
ISBN 13, or (2) it's non-ISBN 10 ASIN (i.e. one that starts with `B`).
"""
if (isbn_13s := product.get("isbn_13")) and len(isbn_13s):
return isbn_13s[0]

if product.get("isbn_10") and (
cache_key := isbn_10_to_isbn_13(product.get("isbn_10", [])[0])
):
return cache_key

if (source_records := product.get("source_records")) and (
amazon_record := next(
(record for record in source_records if record.startswith("amazon:")), ""
)
):
return amazon_record.split(":")[1]

return ""


def process_amazon_batch(isbn_10s_or_asins: list[str]) -> None:
"""
Call the Amazon API to get the products for a list of isbn_10s and store
each product in memcache using amazon_product_{isbn_13} as the cache key.
"""
logger.info(f"process_amazon_batch(): {len(isbn_10s)} items")
logger.info(f"process_amazon_batch(): {len(isbn_10s_or_asins)} items")
try:
products = web.amazon_api.get_products(isbn_10s, serialize=True)
products = web.amazon_api.get_products(isbn_10s_or_asins, serialize=True)
# stats_ol_affiliate_amazon_imports - Open Library - Dashboards - Grafana
# http://graphite.us.archive.org Metrics.stats.ol...
stats.increment(
"ol.affiliate.amazon.total_items_fetched",
n=len(products),
)
except Exception:
logger.exception(f"amazon_api.get_products({isbn_10s}, serialize=True)")
logger.exception(
f"amazon_api.get_products({isbn_10s_or_asins}, serialize=True)"
)
return

for product in products:
cache_key = ( # Make sure we have an isbn_13 for cache key
product.get('isbn_13')
and product.get('isbn_13')[0]
or isbn_10_to_isbn_13(product.get('isbn_10')[0])
)
cache_key = make_cache_key(product) # isbn_13 or non-ISBN-10 ASIN.
cache.memcache_cache.set( # Add each product to memcache
f'amazon_product_{cache_key}', product, expires=WEEK_SECS
)
Expand Down Expand Up @@ -234,20 +256,22 @@ def amazon_lookup(site, stats_client, logger) -> None:

while True:
start_time = time.time()
isbn_10s: set[str] = set() # no duplicates in the batch
while len(isbn_10s) < API_MAX_ITEMS_PER_CALL and seconds_remaining(start_time):
isbn_10s_or_asins: set[str] = set() # no duplicates in the batch
while len(isbn_10s_or_asins) < API_MAX_ITEMS_PER_CALL and seconds_remaining(
start_time
):
try: # queue.get() will block (sleep) until successful or it times out
isbn_10s.add(
isbn_10s_or_asins.add(
web.amazon_queue.get(timeout=seconds_remaining(start_time)).isbn
)
except queue.Empty:
pass
logger.info(f"Before amazon_lookup(): {len(isbn_10s)} items")
if isbn_10s:
logger.info(f"Before amazon_lookup(): {len(isbn_10s_or_asins)} items")
if isbn_10s_or_asins:
time.sleep(seconds_remaining(start_time))
try:
process_amazon_batch(list(isbn_10s))
logger.info(f"After amazon_lookup(): {len(isbn_10s)} items")
process_amazon_batch(list(isbn_10s_or_asins))
logger.info(f"After amazon_lookup(): {len(isbn_10s_or_asins)} items")
except Exception:
logger.exception("Amazon Lookup Thread died")
stats_client.incr("ol.affiliate.amazon.lookup_thread_died")
Expand Down Expand Up @@ -322,6 +346,8 @@ class PrioritizedISBN:
This exists so certain ISBNs can go to the front of the queue for faster
processing as their look-ups are time sensitive and should return look up data
to the caller (e.g. interactive API usage through `/isbn`).
Note: also handles Amazon-specific ASINs.
"""

isbn: str = field(compare=False)
Expand All @@ -332,6 +358,16 @@ class PrioritizedISBN:
class Submit:
@classmethod
def unpack_isbn(cls, isbn) -> tuple[str, str]:
"""
Return an "ASIN" and an ISBN 13 if possible. Here, an "ASIN" is an
ISBN 10 if the input is an ISBN, and an Amazon-specific ASIN starting
with "B" if it's not an ISBN 10.
"""
# If the "isbn" is an Amazon-specific ASIN, return it.
if len(isbn) == 10 and isbn.startswith("B"):
return (isbn, "")

# Get ISBN 10 and 13.
isbn = normalize_isbn(isbn.replace('-', ''))
isbn10 = (
isbn
Expand All @@ -341,48 +377,52 @@ def unpack_isbn(cls, isbn) -> tuple[str, str]:
isbn13 = isbn if len(isbn) == 13 else isbn_10_to_isbn_13(isbn)
return isbn10, isbn13

def GET(self, isbn: str) -> str:
def GET(self, isbn_or_asin: str) -> str:
"""
If `isbn` is in memcache, then return the `hit` (which is marshaled into
a format appropriate for import on Open Library if `?high_priority=true`).
If `isbn_or_asin` is in memcache, then return the `hit` (which is marshalled
into a format appropriate for import on Open Library if `?high_priority=true`).
If no hit, then queue the isbn for look up and either attempt to return
If no hit, then queue the isbn_or_asin for look up and either attempt to return
a promise as `submitted`, or if `?high_priority=true`, return marshalled data
from the cache.
`Priority.HIGH` is set when `?high_priority=true` and is the highest priority.
It is used when the caller is waiting for a response with the AMZ data, if
available. See `PrioritizedISBN` for more on prioritization.
NOTE: For this API, "ASINs" are ISBN 10s when valid ISBN 10s, and otherwise
they are Amazon-specific identifiers starting with "B".
"""
# cache could be None if reached before initialized (mypy)
if not web.amazon_api:
return json.dumps({"error": "not_configured"})

isbn10, isbn13 = self.unpack_isbn(isbn)
if not isbn10:
asin, isbn13 = self.unpack_isbn(isbn_or_asin)
if not asin:
return json.dumps(
{"error": "rejected_isbn", "isbn10": isbn10, "isbn13": isbn13}
{"error": "rejected_isbn", "asin": asin, "isbn13": isbn13}
)

input = web.input(high_priority=False)
priority = (
Priority.HIGH if input.get("high_priority") == "true" else Priority.LOW
)

# Cache lookup by isbn13. If there's a hit return the product to the caller
if product := cache.memcache_cache.get(f'amazon_product_{isbn13}'):
# Cache lookup by isbn13 or asin. If there's a hit return the product to
# the caller.
if product := cache.memcache_cache.get(f'amazon_product_{isbn13 or asin}'):
return json.dumps(
{
"status": "success",
"hit": product,
}
)

# Cache misses will be submitted to Amazon as ASINs (isbn10) and the response
# will be `staged` for import.
if isbn10 not in web.amazon_queue.queue:
isbn10_queue_item = PrioritizedISBN(isbn=isbn10, priority=priority)
web.amazon_queue.put_nowait(isbn10_queue_item)
# Cache misses will be submitted to Amazon as ASINs (isbn10 if possible, or
# an 'true' ASIN otherwise) and the response will be `staged` for import.
if asin not in web.amazon_queue.queue:
asin_queue_item = PrioritizedISBN(isbn=asin, priority=priority)
web.amazon_queue.put_nowait(asin_queue_item)

# Give us a snapshot over time of how many new isbns are currently queued
stats.put(
Expand All @@ -396,7 +436,9 @@ def GET(self, isbn: str) -> str:
if priority == Priority.HIGH:
for _ in range(RETRIES):
time.sleep(1)
if product := cache.memcache_cache.get(f'amazon_product_{isbn13}'):
if product := cache.memcache_cache.get(
f'amazon_product_{isbn13 or asin}'
):
cleaned_metadata = clean_amazon_metadata_for_load(product)
source, pid = cleaned_metadata['source_records'][0].split(":")
if ImportItem.find_staged_or_pending(
Expand Down
41 changes: 41 additions & 0 deletions scripts/tests/test_affiliate_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@
# docker compose run --rm home pytest scripts/tests/test_affiliate_server.py
"""
import sys
from typing import Any
from unittest.mock import MagicMock

import pytest

# TODO: Can we remove _init_path someday :(
sys.modules['_init_path'] = MagicMock()

from openlibrary.mocks.mock_infobase import mock_site # noqa: F401
from scripts.affiliate_server import ( # noqa: E402
Submit,
get_isbns_from_book,
get_isbns_from_books,
get_editions_for_books,
get_pending_books,
make_cache_key,
)

ol_editions = {
Expand Down Expand Up @@ -118,3 +123,39 @@ def test_get_isbns_from_books():
'1234567890124',
'1234567891',
]


@pytest.mark.parametrize(
["isbn_or_asin", "expected_key"],
[
({"isbn_10": [], "isbn_13": ["9780747532699"]}, "9780747532699"), # Use 13.
(
{"isbn_10": ["0747532699"], "source_records": ["amazon:B06XYHVXVJ"]},
"9780747532699",
), # 10 -> 13.
(
{"isbn_10": [], "isbn_13": [], "source_records": ["amazon:B06XYHVXVJ"]},
"B06XYHVXVJ",
), # Get non-ISBN 10 ASIN from `source_records` if necessary.
({"isbn_10": [], "isbn_13": [], "source_records": []}, ""), # Nothing to use.
({}, ""), # Nothing to use.
],
)
def test_make_cache_key(isbn_or_asin: dict[str, Any], expected_key: str) -> None:
got = make_cache_key(isbn_or_asin)
assert got == expected_key


@pytest.mark.parametrize(
["isbn_or_asin", "expected"],
[
("0123456789", ("0123456789", "9780123456786")),
("0-.123456789", ("0123456789", "9780123456786")),
("9780123456786", ("0123456789", "9780123456786")),
("9-.780123456786", ("0123456789", "9780123456786")),
("B012346789", ("B012346789", "")),
],
)
def test_unpack_isbn(isbn_or_asin, expected) -> None:
got = Submit.unpack_isbn(isbn_or_asin)
assert got == expected

0 comments on commit f2f509f

Please sign in to comment.