Skip to content

Commit

Permalink
record-tester: Inline options field in recordTester struct
Browse files Browse the repository at this point in the history
No need for that indirection, just another layer of copying around.
  • Loading branch information
victorges committed Jun 19, 2024
1 parent e659aa7 commit 7f8c979
Showing 1 changed file with 36 additions and 55 deletions.
91 changes: 36 additions & 55 deletions internal/app/recordtester/recordtester_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,11 @@ type (
}

recordTester struct {
ctx context.Context
cancel context.CancelFunc
lapi *api.Client
lanalyzers testers.AnalyzerByRegion
ingest *api.Ingest
recordObjectStoreId string
recordingSpec *api.RecordingSpec
skipSourcePlayback bool
useForceURL bool
recordingWaitTime time.Duration
useHTTP bool
mp4 bool
streamHealth bool
serfOpts SerfOptions
RecordTesterOptions
serfOpts SerfOptions

ctx context.Context
cancel context.CancelFunc

// mutable fields
streamID string
Expand All @@ -88,19 +79,9 @@ type (
func NewRecordTester(gctx context.Context, opts RecordTesterOptions, serfOpts SerfOptions) IRecordTester {
ctx, cancel := context.WithCancel(gctx)
rt := &recordTester{
lapi: opts.API,
lanalyzers: opts.Analyzers,
ingest: opts.Ingest,
RecordTesterOptions: opts,
ctx: ctx,
cancel: cancel,
recordObjectStoreId: opts.RecordObjectStoreId,
recordingSpec: opts.RecordingSpec,
skipSourcePlayback: opts.SkipSourcePlayback,
useForceURL: opts.UseForceURL,
recordingWaitTime: opts.RecordingWaitTime,
useHTTP: opts.UseHTTP,
mp4: opts.TestMP4,
streamHealth: opts.TestStreamHealth,
serfOpts: serfOpts,
}
return rt
Expand All @@ -115,7 +96,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
}
apiTry := 0
for {
broadcasters, err = rt.lapi.Broadcasters()
broadcasters, err = rt.API.Broadcasters()
if err != nil {
if testers.Timedout(err) && apiTry < 3 {
apiTry++
Expand All @@ -129,21 +110,21 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
glog.V(model.DEBUG).Infof("Got broadcasters: %+v", broadcasters)
glog.V(model.DEBUG).Infof("Streaming video file '%s'\n", fileName)

if rt.useHTTP && len(broadcasters) == 0 {
if rt.UseHTTP && len(broadcasters) == 0 {
return 254, errors.New("empty list of broadcasters")
} else if (!rt.useHTTP && ingest.Ingest == "") || ingest.Playback == "" {
} else if (!rt.UseHTTP && ingest.Ingest == "") || ingest.Playback == "" {
return 254, errors.New("empty ingest URLs")
}

hostName, _ := os.Hostname()
streamName := fmt.Sprintf("%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00"))
var stream *api.Stream
for {
stream, err = rt.lapi.CreateStream(api.CreateStreamReq{
stream, err = rt.API.CreateStream(api.CreateStreamReq{
Name: streamName,
Record: true,
RecordingSpec: rt.recordingSpec,
RecordObjectStoreId: rt.recordObjectStoreId,
RecordingSpec: rt.RecordingSpec,
RecordObjectStoreId: rt.RecordObjectStoreId,
})
if err != nil {
if testers.Timedout(err) && apiTry < 3 {
Expand All @@ -169,9 +150,9 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
rtmpURL := fmt.Sprintf("%s/%s", ingest.Ingest, stream.StreamKey)

testerFuncs := []testers.StartTestFunc{}
if rt.streamHealth {
if rt.TestStreamHealth {
testerFuncs = append(testerFuncs, func(ctx context.Context, mediaURL string, waitForTarget time.Duration, opts testers.Streamer2Options) testers.Finite {
return testers.NewStreamHealth(ctx, stream.ID, rt.lanalyzers, 2*time.Minute)
return testers.NewStreamHealth(ctx, stream.ID, rt.Analyzers, 2*time.Minute)
})
}

Expand All @@ -190,7 +171,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
}
glog.V(model.SHORT).Infof("RTMP: %s streamId=%s playbackId=%s", rtmpURL, stream.ID, stream.PlaybackID)
glog.V(model.SHORT).Infof("MEDIA: %s streamId=%s playbackId=%s", mediaURL, stream.ID, stream.PlaybackID)
if rt.useHTTP {
if rt.UseHTTP {
sterr := rt.doOneHTTPStream(fileName, streamName, broadcasters[0], testDuration, stream)
if sterr != nil {
glog.Warningf("Streaming returned error err=%v streamId=%s playbackId=%s", sterr, stream.ID, stream.PlaybackID)
Expand Down Expand Up @@ -261,18 +242,18 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
}

lapiNoAPIKey := api.NewAPIClient(api.ClientOptions{
Server: rt.lapi.GetServer(),
Server: rt.API.GetServer(),
AccessToken: "", // test playback info call without API key
Timeout: 8 * time.Second,
})
if code, err := checkPlaybackInfo(stream.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil {
if code, err := checkPlaybackInfo(stream.PlaybackID, rt.API, lapiNoAPIKey); err != nil {
return code, err
}

glog.Infof("Waiting 10 seconds. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID)
time.Sleep(10 * time.Second)
// now get sessions
sessions, err := rt.lapi.GetSessionsNew(stream.ID, false)
sessions, err := rt.API.GetSessionsNew(stream.ID, false)
if err != nil {
glog.Errorf("Error getting sessions err=%v streamId=%s playbackId=%s", err, stream.ID, stream.PlaybackID)
return 252, err
Expand Down Expand Up @@ -307,8 +288,8 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.

glog.Infof("Streaming done, waiting for recording URL to appear. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID)

deadline := time.Now().Add(rt.recordingWaitTime)
if rt.useForceURL {
deadline := time.Now().Add(rt.RecordingWaitTime)
if rt.UseForceURL {
deadline = time.Now().Add(5 * time.Second)
}

Expand All @@ -332,7 +313,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
for _, sess := range sessions {
// currently the assetID is the same as the sessionID so we could just query on that but just in case that
// ever changes, we can use the ListAssets call to find the asset
assets, _, err := rt.lapi.ListAssets(api.ListOptions{
assets, _, err := rt.API.ListAssets(api.ListOptions{
Limit: 1,
Filters: map[string]interface{}{
"sourceSessionId": sess.ID,
Expand All @@ -350,7 +331,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
}
asset := assets[0]

if code, err := checkPlaybackInfo(asset.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil {
if code, err := checkPlaybackInfo(asset.PlaybackID, rt.API, lapiNoAPIKey); err != nil {
errCode, errs = code, append(errs, err)
} else {
// if we get playback before the processing is done it means source playback was provided
Expand All @@ -365,12 +346,12 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
}
}
}
if !sourcePlayback && !rt.skipSourcePlayback {
if !sourcePlayback && !rt.SkipSourcePlayback {
return 246, errors.New("source playback was not provided")
}

// check actual recordings playback
sessions, err = rt.lapi.GetSessionsNew(stream.ID, false)
sessions, err = rt.API.GetSessionsNew(stream.ID, false)
if err != nil {
glog.Errorf("Error getting sessions err=%v streamId=%s playbackId=%s", err, stream.ID, stream.PlaybackID)
return 252, err
Expand All @@ -386,7 +367,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.

for _, sess := range sessions {
statusShould := api.RecordingStatusReady
if rt.useForceURL {
if rt.UseForceURL {
statusShould = api.RecordingStatusWaiting
}
if sess.RecordingStatus != statusShould {
Expand All @@ -402,7 +383,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
if err = rt.isCancelled(); err != nil {
return 0, err
}
if rt.mp4 {
if rt.TestMP4 {
es, err := rt.checkRecordingMp4(stream, sess.Mp4Url, testDuration)
if err != nil {
return es, err
Expand All @@ -420,7 +401,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.

glog.Infof("Done Record Test. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID)

rt.lapi.DeleteStream(stream.ID)
rt.API.DeleteStream(stream.ID)
return 0, nil
}

Expand All @@ -443,14 +424,14 @@ func checkPlaybackInfo(playbackID string, withKey, withoutKey *api.Client) (int,
}

func (rt *recordTester) getIngestInfo() (*api.Ingest, error) {
if rt.ingest != nil {
return rt.ingest, nil
if rt.Ingest != nil {
return rt.Ingest, nil
}
var ingests []api.Ingest
apiTry := 0
for {
var err error
ingests, err = rt.lapi.Ingest(false)
ingests, err = rt.API.Ingest(false)
if err != nil {
if testers.Timedout(err) && apiTry < 3 {
apiTry++
Expand All @@ -472,11 +453,11 @@ func (rt *recordTester) doOneHTTPStream(fileName, streamName, broadcasterURL str
var err error
apiTry := 0
for {
session, err = rt.lapi.CreateStream(api.CreateStreamReq{
session, err = rt.API.CreateStream(api.CreateStreamReq{
Name: streamName,
Record: true,
RecordingSpec: rt.recordingSpec,
RecordObjectStoreId: rt.recordObjectStoreId,
RecordingSpec: rt.RecordingSpec,
RecordObjectStoreId: rt.RecordObjectStoreId,
ParentID: stream.ID,
})
if err != nil {
Expand Down Expand Up @@ -572,8 +553,8 @@ func (rt *recordTester) checkRecordingHls(stream *api.Stream, url string, stream
vs := downloader.VODStats()
rt.vodStats = vs
expectedProfiles := len(api.StandardProfiles) + 1
if rt.recordingSpec != nil && rt.recordingSpec.Profiles != nil {
expectedProfiles = len(*rt.recordingSpec.Profiles) + 1
if rt.RecordingSpec != nil && rt.RecordingSpec.Profiles != nil {
expectedProfiles = len(*rt.RecordingSpec.Profiles) + 1
}
if len(vs.SegmentsNum) != expectedProfiles {
glog.Warningf("Number of renditions doesn't match! Has %d should %d. streamId=%s playbackId=%s", len(vs.SegmentsNum), len(api.StandardProfiles)+1, stream.ID, stream.PlaybackID)
Expand Down Expand Up @@ -605,7 +586,7 @@ func (rt *recordTester) VODStats() model.VODStats {

func (rt *recordTester) Clean() {
if rt.streamID != "" {
rt.lapi.DeleteStream(rt.streamID)
rt.API.DeleteStream(rt.streamID)
}
}

Expand Down

0 comments on commit 7f8c979

Please sign in to comment.