Skip to content

Commit

Permalink
tag search on elasticsearch (#3302)
Browse files Browse the repository at this point in the history
* tag search on elasticsearch

* lower_keyword for all tag mappings.

skip tags matview refresh

* use tracks.tag_list field

* attempt to match existing endpoint response

* legacy_mode flag for es finalize_response

for non-v1 endpoints

* check for existing esclient
  • Loading branch information
stereosteve authored Jun 27, 2022
1 parent 45c65eb commit 215f3d8
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 62 deletions.
7 changes: 7 additions & 0 deletions discovery-provider/es-indexer/src/helpers/splitTags.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export function splitTags(tags: string | null): string[] {
if (!tags) return []
return tags
.split(',')
.map((t) => t.trim())
.filter(Boolean)
}
6 changes: 3 additions & 3 deletions discovery-provider/es-indexer/src/indexNames.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export const indexNames = {
playlists: 'playlists6',
playlists: 'playlists7',
reposts: 'reposts2',
saves: 'saves2',
tracks: 'tracks6',
users: 'users6',
tracks: 'tracks7',
users: 'users7',
}
8 changes: 6 additions & 2 deletions discovery-provider/es-indexer/src/indexers/PlaylistIndexer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { IndicesCreateRequest } from '@elastic/elasticsearch/lib/api/types'
import { keyBy, merge } from 'lodash'
import { dialPg } from '../conn'
import { splitTags } from '../helpers/splitTags'
import { indexNames } from '../indexNames'
import { BlocknumberCheckpoint } from '../types/blocknumber_checkpoint'
import { PlaylistDoc } from '../types/docs'
Expand Down Expand Up @@ -77,7 +78,10 @@ export class PlaylistIndexer extends BaseIndexer<PlaylistDoc> {
properties: {
mood: { type: 'keyword' },
genre: { type: 'keyword' },
tags: { type: 'keyword' },
tags: {
type: 'keyword',
normalizer: 'lower_asciifolding',
},
play_count: { type: 'integer' },
repost_count: { type: 'integer' },
save_count: { type: 'integer' },
Expand Down Expand Up @@ -215,7 +219,7 @@ export class PlaylistIndexer extends BaseIndexer<PlaylistDoc> {
and track_id in (${idList})`
const allTracks = await pg.query(q)
for (let t of allTracks.rows) {
t.tags = t.tags?.split(',').filter(Boolean)
t.tags = splitTags(t.tags)
}
return keyBy(allTracks.rows, 'track_id')
}
Expand Down
9 changes: 5 additions & 4 deletions discovery-provider/es-indexer/src/indexers/TrackIndexer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { IndicesCreateRequest } from '@elastic/elasticsearch/lib/api/types'
import { merge } from 'lodash'
import { splitTags } from '../helpers/splitTags'
import { indexNames } from '../indexNames'
import { BlocknumberCheckpoint } from '../types/blocknumber_checkpoint'
import { TrackDoc } from '../types/docs'
Expand Down Expand Up @@ -58,9 +59,9 @@ export class TrackIndexer extends BaseIndexer<TrackDoc> {
},
},
length: { type: 'integer' },
tags: {
type: 'text',
analyzer: 'comma_analyzer',
tag_list: {
type: 'keyword',
normalizer: 'lower_asciifolding',
},
genre: { type: 'keyword' },
mood: { type: 'keyword' },
Expand Down Expand Up @@ -186,7 +187,7 @@ export class TrackIndexer extends BaseIndexer<TrackDoc> {
row.suggest = [row.title, row.user.handle, row.user.name]
.filter((x) => x)
.join(' ')
row.tags = row.tags
row.tag_list = splitTags(row.tags)
row.repost_count = row.reposted_by.length
row.favorite_count = row.saved_by.length
row.duration = Math.ceil(
Expand Down
5 changes: 3 additions & 2 deletions discovery-provider/es-indexer/src/indexers/UserIndexer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { IndicesCreateRequest } from '@elastic/elasticsearch/lib/api/types'
import { groupBy, keyBy, merge } from 'lodash'
import { dialPg } from '../conn'
import { splitTags } from '../helpers/splitTags'
import { indexNames } from '../indexNames'
import { BlocknumberCheckpoint } from '../types/blocknumber_checkpoint'
import { UserDoc } from '../types/docs'
Expand Down Expand Up @@ -61,7 +62,7 @@ export class UserIndexer extends BaseIndexer<UserDoc> {
properties: {
mood: { type: 'keyword' },
genre: { type: 'keyword' },
tags: { type: 'keyword' },
tags: { type: 'keyword', normalizer: 'lower_asciifolding' },
},
},
},
Expand Down Expand Up @@ -172,7 +173,7 @@ export class UserIndexer extends BaseIndexer<UserDoc> {
`
const allTracks = await pg.query(q)
for (let t of allTracks.rows) {
t.tags = t.tags?.split(',').filter(Boolean)
t.tags = splitTags(t.tags)
}
const grouped = groupBy(allTracks.rows, 'owner_id')
return grouped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import {

export const sharedIndexSettings: IndicesIndexSettings = {
analysis: {
normalizer: {
lower_asciifolding: {
type: 'custom',
filter: ['asciifolding', 'lowercase'],
},
},
analyzer: {
standard_asciifolding: {
type: 'custom',
Expand Down
2 changes: 1 addition & 1 deletion discovery-provider/es-indexer/src/types/docs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export type TrackDoc = TrackRow & {
saved_by: number[]
routes: string[]
permalink: string
tags: string // comma separated
tag_list: string[]
repost_count: number
favorite_count: number
play_count: any // todo: is it a string or number? pg returns string
Expand Down
169 changes: 128 additions & 41 deletions discovery-provider/src/queries/search_es.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Any, Dict
from typing import Any, Dict, Optional

from src.api.v1.helpers import (
extend_favorite,
Expand Down Expand Up @@ -123,18 +123,7 @@ def search_es_full(args: dict):
]
)

# add size and limit with some
# over-fetching for sake of drop_copycats
index_name = ""
for dsl in mdsl:
if "index" in dsl:
index_name = dsl["index"]
continue
dsl["size"] = limit
dsl["from"] = offset
if index_name == ES_USERS:
dsl["size"] = limit + 5

mdsl_limit_offset(mdsl, limit, offset)
mfound = esclient.msearch(searches=mdsl)

response: Dict = {
Expand Down Expand Up @@ -168,22 +157,105 @@ def search_es_full(args: dict):
if current_user_id:
response["saved_albums"] = pluck_hits(mfound["responses"].pop(0))

finalize_response(response, limit, current_user_id)
return response


def search_tags_es(q: str, kind="all", current_user_id=None, limit=0, offset=0):
if not esclient:
raise Exception("esclient is None")

do_tracks = kind == "all" or kind == "tracks"
do_users = kind == "all" or kind == "users"
mdsl: Any = []

def tag_match(fieldname):
match = {
"query": {
"bool": {
"must": [{"match": {fieldname: {"query": q}}}],
"must_not": [],
"should": [],
}
}
}
return match

if do_tracks:
mdsl.extend([{"index": ES_TRACKS}, tag_match("tag_list")])
if current_user_id:
dsl = tag_match("tag_list")
dsl["query"]["bool"]["must"].append(be_saved(current_user_id))
mdsl.extend([{"index": ES_TRACKS}, dsl])

if do_users:
mdsl.extend([{"index": ES_USERS}, tag_match("tracks.tags")])
if current_user_id:
dsl = tag_match("tracks.tags")
dsl["query"]["bool"]["must"].append(be_followed(current_user_id))
mdsl.extend([{"index": ES_USERS}, dsl])

mdsl_limit_offset(mdsl, limit, offset)
mfound = esclient.msearch(searches=mdsl)

response: Dict = {
"tracks": [],
"saved_tracks": [],
"users": [],
"followed_users": [],
}

if do_tracks:
response["tracks"] = pluck_hits(mfound["responses"].pop(0))
if current_user_id:
response["saved_tracks"] = pluck_hits(mfound["responses"].pop(0))

if do_users:
response["users"] = pluck_hits(mfound["responses"].pop(0))
if current_user_id:
response["followed_users"] = pluck_hits(mfound["responses"].pop(0))

finalize_response(response, limit, current_user_id, legacy_mode=True)
return response


def mdsl_limit_offset(mdsl, limit, offset):
# add size and limit with some over-fetching
# for sake of drop_copycats
index_name = ""
for dsl in mdsl:
if "index" in dsl:
index_name = dsl["index"]
continue
dsl["size"] = limit
dsl["from"] = offset
if index_name == ES_USERS:
dsl["size"] = limit + 5


def finalize_response(
response: Dict, limit: int, current_user_id: Optional[int], legacy_mode=False
):
"""Hydrates users and contextualizes results for current user (if applicable).
Also removes extra indexed fields so as to match the fieldset from postgres.
legacy_mode=True will skip v1 api transforms.
This is similar to the code in get_feed_es (which does it's own thing,
but could one day use finalize_response with legacy_mode=True).
e.g. doesn't encode IDs and populates `followee_saves` instead of `followee_reposts`
"""
if not esclient:
raise Exception("esclient is None")

# hydrate users, saves, reposts
item_keys = []
user_ids = set()
if current_user_id:
user_ids.add(current_user_id)

# collect keys for fetching
for k in [
"tracks",
"saved_tracks",
"playlists",
"saved_playlists",
"albums",
"saved_albums",
]:
for item in response[k]:
for items in response.values():
for item in items:
item_keys.append(item_key(item))
user_ids.add(item.get("owner_id", item.get("playlist_owner_id")))

Expand All @@ -192,7 +264,8 @@ def search_es_full(args: dict):
current_user = None

if user_ids:
users_mget = esclient.mget(index=ES_USERS, ids=list(user_ids))
ids = [str(id) for id in user_ids]
users_mget = esclient.mget(index=ES_USERS, ids=ids)
users_by_id = {d["_id"]: d["_source"] for d in users_mget["docs"] if d["found"]}
if current_user_id:
current_user = users_by_id.get(str(current_user_id))
Expand All @@ -209,25 +282,24 @@ def search_es_full(args: dict):
for k in ["tracks", "saved_tracks"]:
tracks = response[k]
hydrate_user(tracks, users_by_id)
hydrate_saves_reposts(tracks, follow_saves, follow_reposts)
response[k] = transform_tracks(tracks, users_by_id, current_user)
hydrate_saves_reposts(tracks, follow_saves, follow_reposts, legacy_mode)
response[k] = [map_track(track, current_user, legacy_mode) for track in tracks]

# users: finalize
for k in ["users", "followed_users"]:
users = drop_copycats(response[k])
users = users[:limit]
response[k] = [
extend_user(populate_user_metadata_es(user, current_user)) for user in users
]
response[k] = [map_user(user, current_user, legacy_mode) for user in users]

# playlists: finalize
for k in ["playlists", "saved_playlists"]:
for k in ["playlists", "saved_playlists", "albums", "saved_albums"]:
if k not in response:
continue
playlists = response[k]
hydrate_saves_reposts(playlists, follow_saves, follow_reposts)
hydrate_saves_reposts(playlists, follow_saves, follow_reposts, legacy_mode)
hydrate_user(playlists, users_by_id)
response[k] = [
extend_playlist(populate_track_or_playlist_metadata_es(item, current_user))
for item in playlists
map_playlist(playlist, current_user, legacy_mode) for playlist in playlists
]

return response
Expand Down Expand Up @@ -403,18 +475,33 @@ def hydrate_user(items, users_by_id):
item["user"] = user


def hydrate_saves_reposts(items, follow_saves, follow_reposts):
def hydrate_saves_reposts(items, follow_saves, follow_reposts, legacy_mode):
for item in items:
ik = item_key(item)
item["followee_reposts"] = [extend_repost(r) for r in follow_reposts[ik]]
item["followee_favorites"] = [extend_favorite(x) for x in follow_saves[ik]]
if legacy_mode:
item["followee_reposts"] = follow_reposts[ik]
item["followee_saves"] = follow_saves[ik]
else:
item["followee_reposts"] = [extend_repost(r) for r in follow_reposts[ik]]
item["followee_favorites"] = [extend_favorite(x) for x in follow_saves[ik]]


def map_user(user, current_user, legacy_mode):
user = populate_user_metadata_es(user, current_user)
if not legacy_mode:
user = extend_user(user)
return user

def transform_tracks(tracks, users_by_id, current_user):
tracks_out = []
for track in tracks:
track = populate_track_or_playlist_metadata_es(track, current_user)

def map_track(track, current_user, legacy_mode):
track = populate_track_or_playlist_metadata_es(track, current_user)
if not legacy_mode:
track = extend_track(track)
tracks_out.append(track)
return track


return tracks_out
def map_playlist(playlist, current_user, legacy_mode):
playlist = populate_track_or_playlist_metadata_es(playlist, current_user)
if not legacy_mode:
playlist = extend_playlist(playlist)
return playlist
7 changes: 6 additions & 1 deletion discovery-provider/src/queries/search_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
user_handle_exact_match_boost,
user_name_weight,
)
from src.queries.search_es import search_es_full
from src.queries.search_es import search_es_full, search_tags_es
from src.queries.search_track_tags import search_track_tags
from src.queries.search_user_tags import search_user_tags
from src.utils.db_session import get_db_read_replica
Expand Down Expand Up @@ -97,6 +97,11 @@ def search_tags():
results = {}

(limit, offset) = get_pagination_vars()

if os.getenv("audius_elasticsearch_search_enabled"):
hits = search_tags_es(search_str, kind, current_user_id, limit, offset)
return api_helpers.success_response(hits)

db = get_db_read_replica()
with db.scoped_session() as session:
if searchKind in [SearchKind.all, SearchKind.tracks]:
Expand Down
10 changes: 3 additions & 7 deletions discovery-provider/src/tasks/index_materialized_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@


def update_views(self, db):
if os.getenv("audius_elasticsearch_search_enabled"):
return

with db.scoped_session() as session:
start_time = time.time()
if os.getenv("audius_elasticsearch_search_enabled"):
session.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY tag_track_user")
logger.info(
f"index_materialized_views.py | Finished updating tag_track_user in: {time.time() - start_time} sec."
)
return

logger.info("index_materialized_views.py | Updating materialized views")
session.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY user_lexeme_dict")
session.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY track_lexeme_dict")
Expand Down
Loading

0 comments on commit 215f3d8

Please sign in to comment.