Skip to content

Commit

Permalink
wrtc-lt: More improvements and features for load test (#347)
Browse files Browse the repository at this point in the history
* wrtc-lt/orch: Delete jobs in parallel for quick cleanup

* wrtc-lt/orch: Allow specifying an existing stream id

* wrtc-lt: Allow testing access controlled streams

* wrtc-lt/orch: Remove webrtc- prefix from job names

* .github: Add new params to github action

* .github: nits

* .github: Improve discord messages a bit

* [DEV-ONLY] Use pending container image

* .github: Fail early if access control is set without single-node

* wrtc-lt: Fix public key derivation

* wrtc-lt: Remove webrtc- prefix from job names

* .github: Replace stream-id with custom args

* .github: Remove explicit stream-id input

* Revert "[DEV-ONLY] Use pending container image"

This reverts commit b4413cb.
  • Loading branch information
victorges authored Nov 22, 2023
1 parent 19f8d4b commit 278a106
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 53 deletions.
59 changes: 45 additions & 14 deletions .github/workflows/load-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@ on:
type: string
default: 5m
streamer-region:
description: Google Cloud region to stream from
description: Streamer region (from Google Cloud)
type: string
required: true
default: us-central1
playback-protocol:
description: Playback protocol
required: true
type: choice
options: [ "WebRTC", "HLS" ]
default: "WebRTC"
access-control:
description: Enable JWT access control
type: boolean
required: true
default: false
playback-region-viewers-json:
description: Map of Google Cloud region to desired number of viewers
description: Number of viewers per region (from Google Cloud)
type: string
required: true
default: '{"us-central1":20,"europe-west2":20,"asia-southeast1":20}'
Expand All @@ -30,12 +41,6 @@ on:
type: string
required: true
default: 'https://storage.googleapis.com/lp_testharness_assets/countdown_720p_30fps_2sGOP_noBframes_5min.mp4'
playback-protocol:
description: Playback protocol
required: true
type: choice
options: [ "WebRTC", "HLS" ]
default: "WebRTC"
region:
description: "(Optional) Single-node test: Region of node to be tested (also needs pod index below). e.g.: mdw"
type: string
Expand All @@ -44,7 +49,11 @@ on:
description: "(Optional) Single-node test: Index of the Catalyst pod to be tested (also needs region above). e.g.: 0"
type: string
required: false
default: ''
extra-args:
description:
(Optional) Additional arguments to send to load test orchestrator
type: string
required: false
jobs:
load-test:
name: Run ${{ inputs.playback-protocol }} load test
Expand All @@ -66,7 +75,7 @@ jobs:
DISCORD_EMBEDS: >
[{
"title": "${{ inputs.playback-protocol }} load test starting",
"description": "A load test is starting (env=${{ inputs.environment }})",
"description": "A load test is starting in **${{ inputs.environment }}**",
"url": "${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}",
"timestamp": "${{ steps.timestamp.outputs.date }}",
"author": {
Expand Down Expand Up @@ -98,6 +107,10 @@ jobs:
"name": "Single-node target",
"value": "${{ inputs.region }}-${{ inputs.pod-index }}",
"inline": true
}, {
"name": "Access control",
"value": "${{ inputs.access-control }}",
"inline": true
}]
}]
Expand All @@ -110,11 +123,27 @@ jobs:
API_TOKEN="${{ secrets.LOAD_TEST_PROD_API_KEY }}"
STREAMER_BASE_URL="rtmp://rtmp.livepeer.com/live/"
PLAYBACK_BASE_URL="https://lvpr.tv/"
if [ "${{ inputs.access-control }}" = "true" ]; then
if [ -z "${{ inputs.region }}" ] && [ -z "${{ inputs.pod-index }}" ]; then
echo "Error: access-control is only available for single node tests"
exit 1
fi
PLAYBACK_JWT_PRIVATE_KEY="${{ secrets.LOAD_TEST_PROD_JWT_PRIVATE_KEY }}"
fi
else
API_SERVER="livepeer.monster"
API_TOKEN="${{ secrets.LOAD_TEST_STAGING_API_KEY }}"
STREAMER_BASE_URL="rtmp://rtmp.livepeer.monster/live/"
PLAYBACK_BASE_URL="https://monster.lvpr.tv/"
if [ "${{ inputs.access-control }}" = "true" ]; then
if [ -z "${{ inputs.region }}" ] && [ -z "${{ inputs.pod-index }}" ]; then
echo "Error: access-control is only available for single node tests"
exit 1
fi
PLAYBACK_JWT_PRIVATE_KEY="${{ secrets.LOAD_TEST_STAGING_JWT_PRIVATE_KEY }}"
fi
fi
# Override with inputs
Expand Down Expand Up @@ -142,9 +171,10 @@ jobs:
echo "::set-output name=streamer-base-url::${STREAMER_BASE_URL}"
echo "::set-output name=playback-base-url::${PLAYBACK_BASE_URL}"
echo "::set-output name=playback-manifest-url::${PLAYBACK_MANIFEST_URL}"
echo "::set-output name=playback-jwt-private-key::${PLAYBACK_JWT_PRIVATE_KEY}"
- name: Load Test
run: webrtc-load-tester orchestrator
run: webrtc-load-tester orchestrator ${{ inputs.extra-args }}
env:
LT_WEBRTC_DURATION: ${{ inputs.duration }}
LT_WEBRTC_API_SERVER: ${{ steps.env.outputs.api-server }}
Expand All @@ -154,6 +184,7 @@ jobs:
LT_WEBRTC_STREAMER_INPUT_FILE: ${{ inputs.streamer-input-file }}
LT_WEBRTC_PLAYBACK_BASE_URL: ${{ steps.env.outputs.playback-base-url }}
LT_WEBRTC_PLAYBACK_MANIFEST_URL: "${{ steps.env.outputs.playback-manifest-url }}"
LT_WEBRTC_PLAYBACK_JWT_PRIVATE_KEY: ${{ steps.env.outputs.playback-jwt-private-key }}
LT_WEBRTC_PLAYBACK_VIEWERS_PER_WORKER: 10
LT_WEBRTC_PLAYBACK_VIEWERS_PER_CPU: 2
LT_WEBRTC_PLAYBACK_MEMORY_PER_VIEWER_MIB: 400
Expand All @@ -169,7 +200,7 @@ jobs:
DISCORD_EMBEDS: >
[{
"title": "${{ inputs.playback-protocol }} load test finished successfully",
"description": "The load test (env=${{ inputs.environment }}) has finished successfully",
"description": "The load test has finished successfully",
"url": "${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}",
"color": 24576,
"author": {
Expand All @@ -196,7 +227,7 @@ jobs:
DISCORD_USERNAME: ${{ github.triggering_actor }}
DISCORD_EMBEDS: >
[{
"title": "${{ inputs.playback-protocol }} load test has failed! (env=${{ inputs.environment }})",
"title": "${{ inputs.playback-protocol }} load test has failed!,
"url": "${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}",
"color": 8388608,
"author": {
Expand All @@ -223,7 +254,7 @@ jobs:
DISCORD_USERNAME: ${{ github.triggering_actor }}
DISCORD_EMBEDS: >
[{
"title": "${{ inputs.playback-protocol }} load test was cancelled! (env=${{ inputs.environment }})",
"title": "${{ inputs.playback-protocol }} load test was cancelled!",
"url": "${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}",
"color": 11778048,
"author": {
Expand Down
29 changes: 23 additions & 6 deletions cmd/webrtc-load-tester/gcloud/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

run "cloud.google.com/go/run/apiv2"
Expand Down Expand Up @@ -51,7 +52,7 @@ type JobSpec struct {
}

func CreateJob(ctx context.Context, spec JobSpec) (job *runpb.Job, exec *runpb.Execution, err error) {
jobName := fmt.Sprintf("webrtc-load-tester-%s-%s-%s", spec.TestID[:8], spec.Role, spec.Region)
jobName := fmt.Sprintf("load-tester-%s-%s-%s", spec.TestID[:8], spec.Role, spec.Region)
labels := map[string]string{
"webrtc-load-tester": "true",
"load-test-id": spec.TestID,
Expand Down Expand Up @@ -110,15 +111,17 @@ func CreateJob(ctx context.Context, spec JobSpec) (job *runpb.Job, exec *runpb.E
return job, exec, nil
}

func FullJobName(region, name string) string {
return fmt.Sprintf("projects/%s/locations/%s/jobs/%s", projectID, region, name)
}

// DeleteJob is meant to in background/defer so it doesn't get a ctx and doesn't return an error
func DeleteJob(region, name string) {
func DeleteJob(name string) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

fullJobName := fmt.Sprintf("projects/%s/locations/%s/jobs/%s", projectID, region, name)

it := executionsClient.ListExecutions(ctx, &runpb.ListExecutionsRequest{
Parent: fullJobName,
Parent: name,
PageSize: 1000,
})

Expand Down Expand Up @@ -146,7 +149,7 @@ func DeleteJob(region, name string) {
}

glog.Infof("Deleting job: %s", simpleName(name))
deleteOp, err := jobsClient.DeleteJob(ctx, &runpb.DeleteJobRequest{Name: fullJobName})
deleteOp, err := jobsClient.DeleteJob(ctx, &runpb.DeleteJobRequest{Name: name})
if err != nil {
glog.Errorf("Error deleting job %s: %v\n", name, err)
return
Expand All @@ -159,6 +162,20 @@ func DeleteJob(region, name string) {
}
}

// DeleteJobs is a convenience to delete multiple jobs in parallel
func DeleteJobs(jobs []string) {
wg := sync.WaitGroup{}
wg.Add(len(jobs))

for _, job := range jobs {
go func(job string) {
defer wg.Done()
DeleteJob(job)
}(job)
}
wg.Wait()
}

func CheckExecutionStatus(ctx context.Context, name string) (finished bool) {
exec, err := executionsClient.GetExecution(ctx, &runpb.GetExecutionRequest{Name: name})
if err != nil {
Expand Down
59 changes: 41 additions & 18 deletions cmd/webrtc-load-tester/roles/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
const jobsPollingInterval = 1 * time.Minute

type loadTestArguments struct {
TestID string // set one to recover a running test. auto-generated if not provided
TestID string // set one to recover a running test. auto-generated if not provided
StreamID string

// Google Cloud
GoogleCredentialsJSON string
Expand All @@ -40,14 +41,15 @@ type loadTestArguments struct {
}
Playback struct {
BaseURL string
ManifestURL string
JWTPrivateKey string
RegionViewersJSON map[string]int
ViewersPerWorker int
ViewersPerCPU float64
MemoryPerViewerMiB int
DelayBetweenRegions time.Duration
BaseScreenshotFolderOS *url.URL
ScreenshotPeriod time.Duration
ManifestURL string
}
}

Expand All @@ -60,6 +62,7 @@ func Orchestrator() {

utils.ParseFlags(func(fs *flag.FlagSet) {
fs.StringVar(&cliFlags.TestID, "test-id", "", "ID of previous test to recover. If not provided, a new test will be started with a random ID")
fs.StringVar(&cliFlags.StreamID, "stream-id", "", "ID of existing stream to use. Notice that this will be used as the test ID as well but spawn new jobs instead of recovering existing ones")
fs.StringVar(&cliFlags.GoogleCredentialsJSON, "google-credentials-json", "", "Google Cloud service account credentials JSON with access to Cloud Run")
fs.StringVar(&cliFlags.GoogleProjectID, "google-project-id", "livepeer-test", "Google Cloud project ID")
fs.StringVar(&cliFlags.ContainerImage, "container-image", "livepeer/webrtc-load-tester:master", "Container image to use for the worker jobs")
Expand All @@ -72,6 +75,7 @@ func Orchestrator() {

fs.StringVar(&cliFlags.Playback.BaseURL, "playback-base-url", "https://monster.lvpr.tv/", "Base URL for the player page")
fs.StringVar(&cliFlags.Playback.ManifestURL, "playback-manifest-url", "", "URL for playback")
fs.StringVar(&cliFlags.Playback.JWTPrivateKey, "playback-jwt-private-key", "", "Private key to sign JWT tokens for access controlled playback")
utils.JSONVarFlag(fs, &cliFlags.Playback.RegionViewersJSON, "playback-region-viewers-json", `{"us-central1":100,"europe-west2":100}`, "JSON object of Google Cloud regions to the number of viewers that should be simulated there. Notice that the values must be multiples of playback-viewers-per-worker, and up to 1000 x that")
fs.IntVar(&cliFlags.Playback.ViewersPerWorker, "playback-viewers-per-worker", 10, "Number of viewers to simulate per worker")
fs.Float64Var(&cliFlags.Playback.ViewersPerCPU, "playback-viewers-per-cpu", 2, "Number of viewers to allocate per CPU on player jobs")
Expand Down Expand Up @@ -133,15 +137,29 @@ func initClients(cliFlags loadTestArguments) {
glog.Infof("Total number of viewers: %d\n", totalViewers)
}

func runLoadTest(ctx context.Context, args loadTestArguments) error {
stream, err := studioApi.CreateStream(api.CreateStreamReq{
Name: "webrtc-load-test-" + time.Now().UTC().Format(time.RFC3339),
})
if err != nil {
return fmt.Errorf("failed to create stream: %w", err)
}
func runLoadTest(ctx context.Context, args loadTestArguments) (err error) {
var stream *api.Stream
if args.StreamID != "" {
stream, err = studioApi.GetStream(args.StreamID, false)
if err != nil {
return fmt.Errorf("failed to retrieve stream: %w", err)
}
glog.Infof("Retrieved stream with name: %s", stream.Name)
} else {
var playbackPolicy *api.PlaybackPolicy
if args.Playback.JWTPrivateKey != "" {
playbackPolicy = &api.PlaybackPolicy{Type: "jwt"}
}

glog.Infof("Stream created: %s", stream.ID)
stream, err = studioApi.CreateStream(api.CreateStreamReq{
Name: "load-test-" + time.Now().UTC().Format(time.RFC3339),
PlaybackPolicy: playbackPolicy,
})
if err != nil {
return fmt.Errorf("failed to create stream: %w", err)
}
glog.Infof("Stream created: %s", stream.ID)
}

// Use the stream ID as the test ID for simplicity. Helps on recovering a running test as well.
args.TestID = stream.ID
Expand All @@ -150,11 +168,14 @@ func runLoadTest(ctx context.Context, args loadTestArguments) error {

glog.Infof("Access the stream at: https://%s", path.Join(args.APIServer, "/dashboard/streams", stream.ID))

_, streamer, err := gcloud.CreateJob(ctx, streamerJobSpec(args, stream.StreamKey))
var jobsToDelete []string
defer func() { gcloud.DeleteJobs(jobsToDelete) }()

streamerJob, streamer, err := gcloud.CreateJob(ctx, streamerJobSpec(args, stream.StreamKey))
if err != nil {
return fmt.Errorf("failed to create streamer job: %w", err)
}
defer gcloud.DeleteJob(args.Streamer.Region, streamer.Job)
jobsToDelete = append(jobsToDelete, streamerJob.Name)

glog.Infof("Streamer job created on region %s: %s (execution: %s)", args.Streamer.Region, streamer.Job, streamer.Name)

Expand All @@ -163,11 +184,11 @@ func runLoadTest(ctx context.Context, args loadTestArguments) error {
glog.Infof("Waiting %s before starting player in %s", args.Playback.DelayBetweenRegions, region)
wait(ctx, args.Playback.DelayBetweenRegions)

_, viewer, err := gcloud.CreateJob(ctx, playerJobSpec(args, region, numViewers, stream.PlaybackID))
viewerJob, viewer, err := gcloud.CreateJob(ctx, playerJobSpec(args, region, numViewers, stream.PlaybackID))
if err != nil {
return fmt.Errorf("failed to create player job: %w", err)
}
defer gcloud.DeleteJob(region, viewer.Job)
jobsToDelete = append(jobsToDelete, viewerJob.Name)

glog.Infof("Player job created on region %s: %s (execution: %s)", region, viewer.Job, viewer.Name)
executions = append(executions, viewer.Name)
Expand All @@ -185,9 +206,10 @@ func recoverLoadTest(ctx context.Context, args loadTestArguments) error {
glog.Infof("Recovering test with ID %s", args.TestID)
wait(ctx, 5*time.Second)

// TODO: Find the stream by name using the testID

var executions []string
var jobsToDelete []string
defer func() { gcloud.DeleteJobs(jobsToDelete) }()

for region := range args.Playback.RegionViewersJSON {
regionExecs, err := gcloud.ListExecutions(ctx, region, args.TestID)
if err != nil {
Expand All @@ -198,10 +220,10 @@ func recoverLoadTest(ctx context.Context, args loadTestArguments) error {
for _, exec := range regionExecs {
executions = append(executions, exec.Name)

if job := exec.Job; !ownedJobs[job] {
if job := gcloud.FullJobName(region, exec.Job); !ownedJobs[job] {
ownedJobs[job] = true
glog.Infof("Taking ownership of %s job on region %s", job, region)
defer gcloud.DeleteJob(region, job)
jobsToDelete = append(jobsToDelete, job)
}
}
}
Expand Down Expand Up @@ -288,6 +310,7 @@ func playerJobSpec(args loadTestArguments, region string, viewers int, playbackI
"-base-url", args.Playback.BaseURL,
"-playback-id", playbackID,
"-playback-url", playbackURL,
"-jwt-private-key", args.Playback.JWTPrivateKey,
"-simultaneous", strconv.Itoa(simultaneous),
"-duration", args.TestDuration.String(),
}
Expand Down
Loading

0 comments on commit 278a106

Please sign in to comment.