Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better get_feed_es error logging. Fix fetch related saves + reposts #3797

Merged
merged 4 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def test_get_feed_es(app):
feed_results = get_feed_es({"user_id": "1"})
assert feed_results[0]["playlist_id"] == 1
assert feed_results[0]["save_count"] == 1
assert len(feed_results[0]["followee_reposts"]) == 1

assert feed_results[1]["track_id"] == 1
assert feed_results[0]["save_count"] == 1
assert feed_results[1]["save_count"] == 1
6 changes: 5 additions & 1 deletion discovery-provider/src/queries/get_feed.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import logging

from flask import request
from sqlalchemy import and_, desc, func, or_
Expand All @@ -24,6 +25,8 @@

trackDedupeMaxMinutes = 10

logger = logging.getLogger(__name__)


def get_feed(args):
skip_es = request.args.get("es") == "0"
Expand All @@ -32,7 +35,8 @@ def get_feed(args):
try:
(limit, _) = get_pagination_vars()
return get_feed_es(args, limit)
except:
except Exception as e:
logger.error(f"elasticsearch get_feed_es failed: {e}")
return get_feed_sql(args)
else:
return get_feed_sql(args)
Expand Down
44 changes: 30 additions & 14 deletions discovery-provider/src/queries/get_feed_es.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from collections import defaultdict

from src.queries.query_helpers import get_users_ids
from src.utils.elasticdsl import (
ES_PLAYLISTS,
Expand Down Expand Up @@ -218,7 +216,7 @@ def get_feed_es(args, limit=10):
item_keys = [i["item_key"] for i in sorted_feed]

(follow_saves, follow_reposts) = fetch_followed_saves_and_reposts(
current_user_id, item_keys, limit * 20
current_user_id, item_keys
)

for item in sorted_feed:
Expand Down Expand Up @@ -255,7 +253,13 @@ def following_ids_terms_lookup(current_user_id, field):
}


def fetch_followed_saves_and_reposts(current_user_id, item_keys, limit):
def fetch_followed_saves_and_reposts(current_user_id, item_keys):
follow_reposts = {k: [] for k in item_keys}
follow_saves = {k: [] for k in item_keys}

if not current_user_id or not item_keys:
return (follow_saves, follow_reposts)

save_repost_query = {
"query": {
"bool": {
Expand All @@ -266,8 +270,18 @@ def fetch_followed_saves_and_reposts(current_user_id, item_keys, limit):
]
}
},
"size": limit * 20, # how much to overfetch?
"collapse": {
"field": "item_key",
"inner_hits": {
"name": "most_recent",
"size": 5,
"sort": [{"created_at": "desc"}],
},
"max_concurrent_group_searches": 4,
},
"sort": {"created_at": "desc"},
"size": len(item_keys),
"_source": False,
}
mdsl = [
{"index": ES_REPOSTS},
Expand All @@ -277,15 +291,17 @@ def fetch_followed_saves_and_reposts(current_user_id, item_keys, limit):
]

founds = esclient.msearch(searches=mdsl)
(reposts, saves) = [pluck_hits(r) for r in founds["responses"]]

follow_reposts = defaultdict(list)
follow_saves = defaultdict(list)

for r in reposts:
follow_reposts[r["item_key"]].append(r)
for s in saves:
follow_saves[s["item_key"]].append(s)
collapsed_reposts = founds["responses"][0]["hits"]["hits"]
collapsed_saves = founds["responses"][1]["hits"]["hits"]

for group in collapsed_reposts:
reposts = pluck_hits(group["inner_hits"]["most_recent"])
item_key = reposts[0]["item_key"]
follow_reposts[item_key] = reposts
for group in collapsed_saves:
saves = pluck_hits(group["inner_hits"]["most_recent"])
item_key = saves[0]["item_key"]
follow_saves[item_key] = saves

return (follow_saves, follow_reposts)

Expand Down
56 changes: 17 additions & 39 deletions discovery-provider/src/queries/search_es.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
import logging
from typing import Any, Dict, Optional

from src.api.v1.helpers import (
extend_favorite,
extend_playlist,
extend_repost,
extend_track,
extend_user,
)
from src.api.v1.helpers import extend_playlist, extend_track, extend_user
from src.queries.get_feed_es import fetch_followed_saves_and_reposts, item_key
from src.utils.elasticdsl import (
ES_PLAYLISTS,
Expand Down Expand Up @@ -218,7 +212,7 @@ def tag_match(fieldname):
if current_user_id:
response["followed_users"] = pluck_hits(mfound["responses"].pop(0))

finalize_response(response, limit, current_user_id, legacy_mode=True)
finalize_response(response, limit, current_user_id)
return response


Expand All @@ -240,16 +234,10 @@ def finalize_response(
response: Dict,
limit: int,
current_user_id: Optional[int],
legacy_mode=False,
is_auto_complete=False,
):
"""Hydrates users and contextualizes results for current user (if applicable).
Also removes extra indexed fields so as to match the fieldset from postgres.

legacy_mode=True will skip v1 api transforms.
This is similar to the code in get_feed_es (which does it's own thing,
but could one day use finalize_response with legacy_mode=True).
e.g. doesn't encode IDs and populates `followee_saves` instead of `followee_reposts`
"""
if not esclient:
raise Exception("esclient is None")
Expand Down Expand Up @@ -280,37 +268,34 @@ def finalize_response(
users_by_id[id] = populate_user_metadata_es(user, current_user)

# fetch followed saves + reposts
# TODO: instead of limit param (20) should do an agg to get 3 saves / reposts per item_key
if not is_auto_complete:
(follow_saves, follow_reposts) = fetch_followed_saves_and_reposts(
current_user_id, item_keys, 20
current_user_id, item_keys
)

# tracks: finalize
for k in ["tracks", "saved_tracks"]:
tracks = response[k]
hydrate_user(tracks, users_by_id)
if not is_auto_complete:
hydrate_saves_reposts(tracks, follow_saves, follow_reposts, legacy_mode)
response[k] = [map_track(track, current_user, legacy_mode) for track in tracks]
hydrate_saves_reposts(tracks, follow_saves, follow_reposts)
response[k] = [map_track(track, current_user) for track in tracks]

# users: finalize
for k in ["users", "followed_users"]:
users = drop_copycats(response[k])
users = users[:limit]
response[k] = [map_user(user, current_user, legacy_mode) for user in users]
response[k] = [map_user(user, current_user) for user in users]

# playlists: finalize
for k in ["playlists", "saved_playlists", "albums", "saved_albums"]:
if k not in response:
continue
playlists = response[k]
if not is_auto_complete:
hydrate_saves_reposts(playlists, follow_saves, follow_reposts, legacy_mode)
hydrate_saves_reposts(playlists, follow_saves, follow_reposts)
hydrate_user(playlists, users_by_id)
response[k] = [
map_playlist(playlist, current_user, legacy_mode) for playlist in playlists
]
response[k] = [map_playlist(playlist, current_user) for playlist in playlists]

return response

Expand Down Expand Up @@ -480,33 +465,26 @@ def hydrate_user(items, users_by_id):
item["user"] = user


def hydrate_saves_reposts(items, follow_saves, follow_reposts, legacy_mode):
def hydrate_saves_reposts(items, follow_saves, follow_reposts):
for item in items:
ik = item_key(item)
if legacy_mode:
item["followee_reposts"] = follow_reposts[ik]
item["followee_saves"] = follow_saves[ik]
else:
item["followee_reposts"] = [extend_repost(r) for r in follow_reposts[ik]]
item["followee_favorites"] = [extend_favorite(x) for x in follow_saves[ik]]
item["followee_reposts"] = follow_reposts[ik]
item["followee_saves"] = follow_saves[ik]


def map_user(user, current_user, legacy_mode):
def map_user(user, current_user):
user = populate_user_metadata_es(user, current_user)
if not legacy_mode:
user = extend_user(user)
user = extend_user(user)
return user


def map_track(track, current_user, legacy_mode):
def map_track(track, current_user):
track = populate_track_or_playlist_metadata_es(track, current_user)
if not legacy_mode:
track = extend_track(track)
track = extend_track(track)
return track


def map_playlist(playlist, current_user, legacy_mode):
def map_playlist(playlist, current_user):
playlist = populate_track_or_playlist_metadata_es(playlist, current_user)
if not legacy_mode:
playlist = extend_playlist(playlist)
playlist = extend_playlist(playlist)
return playlist
18 changes: 3 additions & 15 deletions discovery-provider/src/utils/elasticdsl.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ def listify(things):


def pluck_hits(found):
if "error" in found:
raise Exception(found["error"])

res = [h["_source"] for h in found["hits"]["hits"]]

# add score for search_quality.py script
Expand All @@ -34,21 +37,6 @@ def pluck_hits(found):
return res


def docs_and_ids(found, id_set=False):
docs = []
ids = []
for hit in found["hits"]["hits"]:
docs.append(hit["_source"])
ids.append(hit["_id"])
if id_set:
ids = set(ids)
return docs, ids


def hits_by_id(found):
return {h["_id"]: h["_source"] for h in found["hits"]["hits"]}


def populate_user_metadata_es(user, current_user):
user["total_balance"] = str(
int(user.get("balance", "0") or "0")
Expand Down
3 changes: 3 additions & 0 deletions discovery-provider/src/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ def get_valid_multiaddr_from_id_json(id_json):


def encode_int_id(id: int):
# if id is already a string, assume it has already been encoded
if isinstance(id, str):
return id
return cast(str, hashids.encode(id))


Expand Down