Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

record-tester: Add support for copy-only recordings #356

Merged
merged 5 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/recordtester/recordtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func main() {
testTranscode := fs.Bool("transcode", false, "Check Transcode API workflow")
catalystPipelineStrategy := fs.String("catalyst-pipeline-strategy", "", "Which catalyst pipeline strategy to use regarding. The appropriate values are defined by catalyst-api itself.")
recordObjectStoreId := fs.String("record-object-store-id", "", "ID for the Object Store to use for recording storage. Forwarded to the streams created in the API")
recordingSpecStr := fs.String("recording-spec", "", "JSON object with the `recordingSpec` field to use in the test streams. Forwarded to the streams created in the API")

// Discord related flags
discordURL := fs.String("discord-url", "", "URL of Discord's webhook to send messages to Discord channel")
Expand Down Expand Up @@ -275,6 +276,13 @@ func main() {
}
}

var recordingSpec *api.RecordingSpec
if *recordingSpecStr != "" {
if err := json.Unmarshal([]byte(*recordingSpecStr), &recordingSpec); err != nil {
glog.Fatalf("Error parsing --recording-spec argument: %v", err)
}
}

serfMembers, err := getSerfMembers(*useSerf, *serfRPCAddr)
if err != nil {
glog.Fatalf("failed to process serf members: %v", err)
Expand Down Expand Up @@ -338,6 +346,7 @@ func main() {
Analyzers: lanalyzers,
Ingest: ingest,
RecordObjectStoreId: *recordObjectStoreId,
RecordingSpec: recordingSpec,
UseForceURL: *forceRecordingUrl,
RecordingWaitTime: *recordingWaitTime,
UseHTTP: *useHttp,
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ require (
github.com/Necroforger/dgrouter v0.0.0-20200517224846-e66453b957c1
github.com/PagerDuty/go-pagerduty v1.7.0
github.com/bwmarrin/discordgo v0.27.1
github.com/golang/glog v1.1.2
github.com/golang/glog v1.2.1
github.com/gosuri/uilive v0.0.3 // indirect
github.com/gosuri/uiprogress v0.0.1
github.com/livepeer/go-api-client v0.4.14-0.20231130155418-dd87e78bea93
github.com/livepeer/go-api-client v0.4.24-0.20240607131835-949d242a631b
github.com/livepeer/go-livepeer v0.7.2-0.20231110152159-b17a70dfe719
github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07
github.com/livepeer/leaderboard-serverless v1.0.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo=
github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ=
github.com/golang/glog v1.2.1 h1:OptwRhECazUx5ix5TTWC3EZhsZEHWcYWY4FQHTIubm4=
github.com/golang/glog v1.2.1/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -555,8 +555,8 @@ github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4n
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
github.com/livepeer/catalyst-api v0.1.1 h1:WP4rHH88b+lsxo33wPCjl0yvqVDNyxkleZH1sA0M5GE=
github.com/livepeer/catalyst-api v0.1.1/go.mod h1:d6XPE9ehhCutWhCqqcmlYqQa+e9bf3Ke92x+gRZlzoQ=
github.com/livepeer/go-api-client v0.4.14-0.20231130155418-dd87e78bea93 h1:vQYapLFJ9EyRWTjOsJr1ullF0wiazRme2fSJDZnFrIs=
github.com/livepeer/go-api-client v0.4.14-0.20231130155418-dd87e78bea93/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-api-client v0.4.24-0.20240607131835-949d242a631b h1:J8cWLpnTINGAWVPU503SnUhTmuDhXnm9QxpC/CFGk2k=
github.com/livepeer/go-api-client v0.4.24-0.20240607131835-949d242a631b/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-livepeer v0.7.2-0.20231110152159-b17a70dfe719 h1:468kFmwQFaI00eNCLL8qA5XuIBMwqqVgKEXvqS7msa8=
github.com/livepeer/go-livepeer v0.7.2-0.20231110152159-b17a70dfe719/go.mod h1:d6qTStiNmXTQ/5YLB9fhzgDV9MdXg3KmqESQpur2Ak0=
github.com/livepeer/go-tools v0.3.0 h1:xK0mJyPWWyvj9Oi9nfLglhCtk0KM8883WB7VO1oPF8g=
Expand Down
153 changes: 107 additions & 46 deletions internal/app/recordtester/recordtester_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/rand"
"net/http"
"os"
"strings"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -51,6 +52,7 @@ type (
Analyzers testers.AnalyzerByRegion
Ingest *api.Ingest
RecordObjectStoreId string
RecordingSpec *api.RecordingSpec
UseForceURL bool
RecordingWaitTime time.Duration
UseHTTP bool
Expand All @@ -65,6 +67,7 @@ type (
lanalyzers testers.AnalyzerByRegion
ingest *api.Ingest
recordObjectStoreId string
recordingSpec *api.RecordingSpec
useForceURL bool
recordingWaitTime time.Duration
useHTTP bool
Expand All @@ -89,6 +92,7 @@ func NewRecordTester(gctx context.Context, opts RecordTesterOptions, serfOpts Se
ctx: ctx,
cancel: cancel,
recordObjectStoreId: opts.RecordObjectStoreId,
recordingSpec: opts.RecordingSpec,
useForceURL: opts.UseForceURL,
recordingWaitTime: opts.RecordingWaitTime,
useHTTP: opts.UseHTTP,
Expand Down Expand Up @@ -132,7 +136,12 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
streamName := fmt.Sprintf("%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00"))
var stream *api.Stream
for {
stream, err = rt.lapi.CreateStream(api.CreateStreamReq{Name: streamName, Record: true, RecordObjectStoreId: rt.recordObjectStoreId})
stream, err = rt.lapi.CreateStream(api.CreateStreamReq{
Name: streamName,
Record: true,
RecordingSpec: rt.recordingSpec,
RecordObjectStoreId: rt.recordObjectStoreId,
})
if err != nil {
if testers.Timedout(err) && apiTry < 3 {
apiTry++
Expand Down Expand Up @@ -247,6 +256,16 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
if err := rt.isCancelled(); err != nil {
return 0, err
}

lapiNoAPIKey := api.NewAPIClient(api.ClientOptions{
Server: rt.lapi.GetServer(),
AccessToken: "", // test playback info call without API key
Timeout: 8 * time.Second,
})
if code, err := checkPlaybackInfo(stream.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil {
return code, err
}

glog.Infof("Waiting 10 seconds. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID)
time.Sleep(10 * time.Second)
// now get sessions
Expand Down Expand Up @@ -284,33 +303,84 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
}

glog.Infof("Streaming done, waiting for recording URL to appear. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID)

deadline := time.Now().Add(rt.recordingWaitTime)
if rt.useForceURL {
deadline = time.Now().Add(5 * time.Second)
}

// For checking if sourcePlayback was available we see if at least 1 session
// recording (asset) got a playbackUrl before the processing was done.
var sourcePlayback bool
for errCode, errs := -1, []error{}; errCode != 0; {
if time.Now().After(deadline) {
errsStrs := make([]string, len(errs))
for i, err := range errs {
errsStrs[i] = err.Error()
}
err := fmt.Errorf("timeout waiting for recording URL to appear: %s", strings.Join(errsStrs, "; "))
return errCode, err
} else if err = rt.isCancelled(); err != nil {
return 0, err
}
time.Sleep(5 * time.Second)
} else {
time.Sleep(rt.recordingWaitTime)

errCode, errs = 0, nil
for _, sess := range sessions {
// currently the assetID is the same as the sessionID so we could just query on that but just in case that
// ever changes, we can use the ListAssets call to find the asset
assets, _, err := rt.lapi.ListAssets(api.ListOptions{
Limit: 1,
Filters: map[string]interface{}{
"sourceSessionId": sess.ID,
},
})
if err != nil {
errCode, errs = 248, append(errs, err)
continue
}

if len(assets) != 1 {
err := fmt.Errorf("unexpected number of assets. expected: 1 actual: %d", len(assets))
errCode, errs = 247, append(errs, err)
continue
}
asset := assets[0]

if code, err := checkPlaybackInfo(asset.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil {
errCode, errs = code, append(errs, err)
} else {
// if we get playback before the processing is done it means source playback was provided
if asset.Status.Phase != "ready" {
sourcePlayback = true
}
}

if asset.Status.Phase != "ready" {
err := fmt.Errorf("asset status is %s but should be ready", asset.Status.Phase)
errCode, errs = 246, append(errs, err)
}
}
}
if err = rt.isCancelled(); err != nil {
return 0, err
if !sourcePlayback {
return 246, errors.New("source playback was not provided")
}

sessions, err = rt.lapi.GetSessionsNew(stream.ID, rt.useForceURL)
// check actual recordings playback
sessions, err = rt.lapi.GetSessionsNew(stream.ID, false)
if err != nil {
err := fmt.Errorf("error getting sessions for stream id=%s err=%v", stream.ID, err)
glog.Errorf("Error getting sessions err=%v streamId=%s playbackId=%s", err, stream.ID, stream.PlaybackID)
return 252, err
}
glog.V(model.DEBUG).Infof("Sessions: %+v streamId=%s playbackId=%s", sessions, stream.ID, stream.PlaybackID)
if err = rt.isCancelled(); err != nil {
return 0, err
}

lapiNoAPIKey := api.NewAPIClient(api.ClientOptions{
Server: rt.lapi.GetServer(),
AccessToken: "", // test playback info call without API key
Timeout: 8 * time.Second,
})
if code, err := checkPlaybackInfo(stream.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil {
return code, err
if len(sessions) != expectedSessions {
err := fmt.Errorf("invalid session count, expected %d but got %d",
expectedSessions, len(sessions))
glog.Error(err)
return 251, err
}

for _, sess := range sessions {
statusShould := api.RecordingStatusReady
if rt.useForceURL {
Expand All @@ -330,40 +400,21 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
return 0, err
}
if rt.mp4 {
es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration)
es, err := rt.checkRecordingMp4(stream, sess.Mp4Url, testDuration)
if err != nil {
return es, err
}
}

es, err := rt.checkDown(stream, sess.RecordingURL, testDuration)
if err != nil {
return es, err
if err = rt.isCancelled(); err != nil {
return 0, err
}

// currently the assetID is the same as the sessionID so we could just query on that but just in case that
// ever changes, we can use the ListAssets call to find the asset
assets, _, err := rt.lapi.ListAssets(api.ListOptions{
Limit: 1,
Filters: map[string]interface{}{
"sourceSessionId": sess.ID,
},
})
es, err := rt.checkRecordingHls(stream, sess.RecordingURL, testDuration)
if err != nil {
return 248, err
}

if len(assets) != 1 {
return 247, fmt.Errorf("unexpected number of assets. expected: 1 actual: %d", len(assets))
}
if !assets[0].SourcePlaybackReady {
return 246, fmt.Errorf("source playback was not ready")
}

if code, err := checkPlaybackInfo(assets[0].PlaybackID, rt.lapi, lapiNoAPIKey); err != nil {
return code, err
return es, err
}
}

glog.Infof("Done Record Test. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID)

rt.lapi.DeleteStream(stream.ID)
Expand Down Expand Up @@ -418,7 +469,13 @@ func (rt *recordTester) doOneHTTPStream(fileName, streamName, broadcasterURL str
var err error
apiTry := 0
for {
session, err = rt.lapi.CreateStream(api.CreateStreamReq{Name: streamName, Record: true, RecordObjectStoreId: rt.recordObjectStoreId, ParentID: stream.ID})
session, err = rt.lapi.CreateStream(api.CreateStreamReq{
Name: streamName,
Record: true,
RecordingSpec: rt.recordingSpec,
RecordObjectStoreId: rt.recordObjectStoreId,
ParentID: stream.ID,
})
if err != nil {
if testers.Timedout(err) && apiTry < 3 {
apiTry++
Expand Down Expand Up @@ -449,7 +506,7 @@ func (rt *recordTester) isCancelled() error {
return nil
}

func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDuration time.Duration) (int, error) {
func (rt *recordTester) checkRecordingMp4(stream *api.Stream, url string, streamDuration time.Duration) (int, error) {
es := 0
started := time.Now()
glog.V(model.VERBOSE).Infof("Downloading mp4 url=%s streamId=%s playbackId=%s", url, stream.ID, stream.PlaybackID)
Expand Down Expand Up @@ -500,7 +557,7 @@ func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDurat
return es, nil
}

func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration time.Duration) (int, error) {
func (rt *recordTester) checkRecordingHls(stream *api.Stream, url string, streamDuration time.Duration) (int, error) {
es := 0
started := time.Now()
downloader := testers.NewM3utester2(rt.ctx, url, false, false, false, false, 5*time.Second, nil, false)
Expand All @@ -511,7 +568,11 @@ func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration
}
vs := downloader.VODStats()
rt.vodStats = vs
if len(vs.SegmentsNum) != len(api.StandardProfiles)+1 {
expectedProfiles := len(api.StandardProfiles) + 1
if rt.recordingSpec != nil && rt.recordingSpec.Profiles != nil {
expectedProfiles = len(*rt.recordingSpec.Profiles) + 1
}
if len(vs.SegmentsNum) != expectedProfiles {
glog.Warningf("Number of renditions doesn't match! Has %d should %d. streamId=%s playbackId=%s", len(vs.SegmentsNum), len(api.StandardProfiles)+1, stream.ID, stream.PlaybackID)
es = 35
}
Expand Down
Loading