-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[C-4660] POC: Pre-fetch CN stream urls (no /stream redirects) #8741
Changes from all commits
2e7877a
4913ce0
6e4e65c
255ac43
62a380f
3f38a4e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -84,6 +84,7 @@ const FULL_ENDPOINT_MAP = { | |
topGenreUsers: '/users/genre/top', | ||
topArtists: '/users/top', | ||
getTrack: (trackId: OpaqueID) => `/tracks/${trackId}`, | ||
getTrackStreamUrl: (trackId: OpaqueID) => `/tracks/${trackId}/stream-url`, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could use sdk for this instead of api client which we are trying to move away from. |
||
getTracks: () => `/tracks`, | ||
getTrackByHandleAndSlug: `/tracks`, | ||
getStems: (trackId: OpaqueID) => `/tracks/${trackId}/stems`, | ||
|
@@ -123,6 +124,17 @@ type GetTrackArgs = { | |
abortOnUnreachable?: boolean | ||
} | ||
|
||
type GetTrackStreamUrlArgs = { | ||
id: ID | ||
currentUserId?: Nullable<ID> | ||
queryParams: QueryParams | ||
unlistedArgs?: { | ||
urlTitle: string | ||
handle: string | ||
} | ||
abortOnUnreachable?: boolean | ||
} | ||
|
||
type GetTracksArgs = { | ||
ids: ID[] | ||
currentUserId: Nullable<ID> | ||
|
@@ -650,6 +662,32 @@ export class AudiusAPIClient { | |
return adapted | ||
} | ||
|
||
async getTrackStreamUrl( | ||
{ | ||
id, | ||
currentUserId, | ||
queryParams, | ||
abortOnUnreachable | ||
}: GetTrackStreamUrlArgs, | ||
retry = true | ||
) { | ||
const encodedTrackId = this._encodeOrThrow(id) | ||
// const encodedCurrentUserId = encodeHashId(currentUserId ?? null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👻 |
||
|
||
this._assertInitialized() | ||
|
||
const trackUrl = await this._getResponse<APIResponse<string>>( | ||
FULL_ENDPOINT_MAP.getTrackStreamUrl(encodedTrackId), | ||
queryParams, | ||
retry, | ||
PathType.VersionPath, | ||
undefined, | ||
abortOnUnreachable | ||
) | ||
|
||
return trackUrl?.data | ||
} | ||
|
||
async getTracks({ ids, currentUserId }: GetTracksArgs) { | ||
this._assertInitialized() | ||
const encodedTrackIds = ids.map((id) => this._encodeOrThrow(id)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -461,6 +461,162 @@ def get(self, track_id): | |
abort_not_found(track_id, ns) | ||
|
||
|
||
stream_url_parser = reqparse.RequestParser(argument_class=DescriptiveArgument) | ||
stream_url_parser.add_argument( | ||
"preview", | ||
description="""Optional - true if streaming track preview""", | ||
type=inputs.boolean, | ||
required=False, | ||
default=False, | ||
) | ||
stream_url_parser.add_argument( | ||
"user_signature", | ||
description="""Optional - signature from the requesting user's wallet. | ||
This is needed to authenticate the user and verify access in case the track is gated.""", | ||
type=str, | ||
) | ||
stream_url_parser.add_argument( | ||
"user_data", | ||
description="""Optional - data which was used to generate the optional signature argument.""", | ||
type=str, | ||
) | ||
stream_url_parser.add_argument( | ||
"nft_access_signature", | ||
description="""Optional - gated content signature for this track which was previously generated by a registered DN. | ||
We perform checks on it and pass it through to CN.""", | ||
type=str, | ||
) | ||
stream_url_parser.add_argument( | ||
"skip_play_count", | ||
description="""Optional - boolean that disables tracking of play counts.""", | ||
type=bool, | ||
required=False, | ||
default=False, | ||
) | ||
stream_url_parser.add_argument( | ||
"api_key", | ||
description="""Optional - API key for third party apps. This is required for tracks that only allow specific API keys.""", | ||
type=str, | ||
required=False, | ||
default=None, | ||
) | ||
|
||
|
||
@ns.route("/<string:track_id>/stream-url") | ||
class TrackStreamUrl(Resource): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What you think about adding a query param to existing stream endpoint: This would save us from copy-pasting existing endpoint. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think our goal here is just to see how fast this feels on staging, so maybe we can revise this in a group call after we get some feel of the efficacy. We might just drop this endpoint too in future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, in hindsight the query param idea was better but like Ray said, it doesn't really matter for this change. |
||
@record_metrics | ||
@ns.doc( | ||
id="""Inspect Track""", | ||
description="""Inspect a track""", | ||
params={"track_id": "A Track ID"}, | ||
responses={ | ||
200: "Success", | ||
400: "Bad request", | ||
500: "Server error", | ||
}, | ||
) | ||
@ns.expect(stream_url_parser) | ||
@cache(ttl_sec=5) | ||
def get(self, track_id): | ||
""" | ||
POC: An alternate form of the /stream url. | ||
Instead of redirecting to the content node, it just returns the url as a string | ||
This means the client can pre-fetch the url | ||
""" | ||
request_args = stream_parser.parse_args() | ||
is_preview = request_args.get("preview") | ||
user_data = request_args.get("user_data") | ||
user_signature = request_args.get("user_signature") | ||
nft_access_signature = request_args.get("nft_access_signature") | ||
api_key = request_args.get("api_key") | ||
|
||
decoded_id = decode_with_abort(track_id, ns) | ||
|
||
info = get_track_access_info(decoded_id) | ||
track = info.get("track") | ||
|
||
if not track: | ||
logger.error( | ||
f"tracks.py | stream | Track with id {track_id} may not exist. Please investigate." | ||
) | ||
abort_not_found(track_id, ns) | ||
elif (track["allowed_api_keys"] and not api_key) or ( | ||
api_key | ||
and track["allowed_api_keys"] | ||
and api_key.lower() not in track["allowed_api_keys"] | ||
): | ||
logger.error( | ||
f"tracks.py | stream | Streaming track {track_id} does not allow streaming from api key {api_key}." | ||
) | ||
abort_not_found(track_id, ns) | ||
redis = redis_connection.get_redis() | ||
|
||
# signature for the track to be included as a query param in the redirect to CN | ||
stream_signature = get_track_stream_signature( | ||
{ | ||
"track": track, | ||
"is_preview": is_preview, | ||
"user_data": user_data, | ||
"user_signature": user_signature, | ||
"nft_access_signature": nft_access_signature, | ||
} | ||
) | ||
|
||
if not stream_signature: | ||
abort_not_found(track_id, ns) | ||
|
||
signature = stream_signature["signature"] | ||
cid = stream_signature["cid"] | ||
params = {"signature": json.dumps(signature)} | ||
skip_play_count = request_args.get("skip_play_count", False) | ||
if skip_play_count: | ||
params["skip_play_count"] = skip_play_count | ||
|
||
base_path = f"tracks/cidstream/{cid}" | ||
query_string = urllib.parse.urlencode(params, quote_via=urllib.parse.quote) | ||
path = f"{base_path}?{query_string}" | ||
|
||
# we cache track cid -> content node so we can avoid | ||
# checking multiple content nodes for a track | ||
# if we already know where to look | ||
redis_key = f"track_cid:{cid}" | ||
cached_content_node = redis.get(redis_key) | ||
stream_url = NotImplemented | ||
if cached_content_node: | ||
cached_content_node = cached_content_node.decode("utf-8") | ||
stream_url = get_stream_url_from_content_node(cached_content_node, path) | ||
if stream_url: | ||
return success_response(stream_url) | ||
|
||
healthy_nodes = get_all_healthy_content_nodes_cached(redis) | ||
if not healthy_nodes: | ||
logger.error( | ||
f"tracks.py | stream | No healthy Content Nodes found when streaming track ID {track_id}. Please investigate." | ||
) | ||
abort_not_found(track_id, ns) | ||
|
||
rendezvous = RendezvousHash( | ||
*[re.sub("/$", "", node["endpoint"].lower()) for node in healthy_nodes] | ||
) | ||
|
||
content_nodes = rendezvous.get_n(9999999, cid) | ||
|
||
# if track has placement_hosts, use that instead | ||
if track.get("placement_hosts"): | ||
content_nodes = track.get("placement_hosts").split(",") | ||
|
||
for content_node in content_nodes: | ||
try: | ||
stream_url = get_stream_url_from_content_node(content_node, path) | ||
if stream_url: | ||
redis.set(redis_key, content_node) | ||
redis.expire(redis_key, 60 * 30) # 30 min ttl | ||
return success_response(stream_url) | ||
except Exception as e: | ||
logger.error(f"Could not locate cid {cid} on {content_node}: {e}") | ||
abort_not_found(track_id, ns) | ||
|
||
|
||
# Stream | ||
|
||
stream_parser = reqparse.RequestParser(argument_class=DescriptiveArgument) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
happy to help look here tomorrow!