Skip to content

Commit

Permalink
Handle views for live videos
Browse files Browse the repository at this point in the history
  • Loading branch information
Chocobozzz committed Nov 6, 2020
1 parent 44b72e0 commit cfe3d3a
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 24 deletions.
45 changes: 33 additions & 12 deletions server/controllers/api/videos/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import { liveRouter } from './live'
import { ownershipVideoRouter } from './ownership'
import { rateVideoRouter } from './rate'
import { watchingRouter } from './watching'
import { LiveManager } from '@server/lib/live-manager'

const auditLogger = auditLoggerFactory('videos')
const videosRouter = express.Router()
Expand Down Expand Up @@ -416,26 +417,46 @@ async function getVideo (req: express.Request, res: express.Response) {
}

async function viewVideo (req: express.Request, res: express.Response) {
const videoInstance = res.locals.onlyImmutableVideo
const immutableVideoAttrs = res.locals.onlyImmutableVideo

const ip = req.ip
const exists = await Redis.Instance.doesVideoIPViewExist(ip, videoInstance.uuid)
const exists = await Redis.Instance.doesVideoIPViewExist(ip, immutableVideoAttrs.uuid)
if (exists) {
logger.debug('View for ip %s and video %s already exists.', ip, videoInstance.uuid)
return res.status(204).end()
logger.debug('View for ip %s and video %s already exists.', ip, immutableVideoAttrs.uuid)
return res.sendStatus(204)
}

await Promise.all([
Redis.Instance.addVideoView(videoInstance.id),
Redis.Instance.setIPVideoView(ip, videoInstance.uuid)
])
const video = await VideoModel.load(immutableVideoAttrs.id)

const serverActor = await getServerActor()
await sendView(serverActor, videoInstance, undefined)
const promises: Promise<any>[] = [
Redis.Instance.setIPVideoView(ip, video.uuid, video.isLive)
]

Hooks.runAction('action:api.video.viewed', { video: videoInstance, ip })
let federateView = true

return res.status(204).end()
// Increment our live manager
if (video.isLive && video.isOwned()) {
LiveManager.Instance.addViewTo(video.id)

// Views of our local live will be sent by our live manager
federateView = false
}

// Increment our video views cache counter
if (!video.isLive) {
promises.push(Redis.Instance.addVideoView(video.id))
}

if (federateView) {
const serverActor = await getServerActor()
promises.push(sendView(serverActor, video, undefined))
}

await Promise.all(promises)

Hooks.runAction('action:api.video.viewed', { video, ip })

return res.sendStatus(204)
}

async function getVideoDescription (req: express.Request, res: express.Response) {
Expand Down
11 changes: 8 additions & 3 deletions server/initializers/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ const CONSTRAINTS_FIELDS = {
}
}

let VIDEO_VIEW_LIFETIME = 60000 * 60 // 1 hour
let VIEW_LIFETIME = {
VIDEO: 60000 * 60, // 1 hour
LIVE: 60000 * 5 // 5 minutes
}

let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour

const VIDEO_TRANSCODING_FPS: VideoTranscodingFPS = {
Expand Down Expand Up @@ -726,7 +730,8 @@ if (isTestInstance() === true) {

REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1

VIDEO_VIEW_LIFETIME = 1000 // 1 second
VIEW_LIFETIME.VIDEO = 1000 // 1 second
VIEW_LIFETIME.LIVE = 1000 * 5 // 5 second
CONTACT_FORM_LIFETIME = 1000 // 1 second

JOB_ATTEMPTS['email'] = 1
Expand Down Expand Up @@ -838,7 +843,7 @@ export {
JOB_COMPLETED_LIFETIME,
HTTP_SIGNATURE,
VIDEO_IMPORT_STATES,
VIDEO_VIEW_LIFETIME,
VIEW_LIFETIME,
CONTACT_FORM_LIFETIME,
VIDEO_PLAYLIST_PRIVACIES,
PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME,
Expand Down
19 changes: 14 additions & 5 deletions server/lib/activitypub/process/process-view.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Redis } from '../../redis'
import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub'
import { APProcessorOptions } from '../../../types/activitypub-processor.model'
import { MActorSignature } from '../../../types/models'
import { LiveManager } from '@server/lib/live-manager'

async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) {
const { activity, byActor } = options
Expand All @@ -19,19 +20,27 @@ export {
// ---------------------------------------------------------------------------

async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) {
const videoObject = activity.type === 'View' ? activity.object : (activity.object as ViewObject).object
const videoObject = activity.type === 'View'
? activity.object
: (activity.object as ViewObject).object

const options = {
videoObject,
fetchType: 'only-immutable-attributes' as 'only-immutable-attributes',
fetchType: 'only-video' as 'only-video',
allowRefresh: false as false
}
const { video } = await getOrCreateVideoAndAccountAndChannel(options)

await Redis.Instance.addVideoView(video.id)

if (video.isOwned()) {
// Don't resend the activity to the sender
// Our live manager will increment the counter and send the view to followers
if (video.isLive) {
LiveManager.Instance.addViewTo(video.id)
return
}

await Redis.Instance.addVideoView(video.id)

// Forward the view but don't resend the activity to the sender
const exceptions = [ byActor ]
await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
}
Expand Down
2 changes: 2 additions & 0 deletions server/lib/job-queue/handlers/video-live-ending.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ async function saveLive (video: MVideo, live: MVideoLive) {
await live.destroy()

video.isLive = false
// Reinit views
video.views = 0
video.state = VideoState.TO_TRANSCODE
video.duration = duration

Expand Down
50 changes: 49 additions & 1 deletion server/lib/live-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
} from '@server/helpers/ffmpeg-utils'
import { logger } from '@server/helpers/logger'
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants'
import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
import { UserModel } from '@server/models/account/user'
import { VideoModel } from '@server/models/video/video'
import { VideoFileModel } from '@server/models/video/video-file'
Expand Down Expand Up @@ -61,6 +61,8 @@ class LiveManager {

private readonly transSessions = new Map<string, FfmpegCommand>()
private readonly videoSessions = new Map<number, string>()
// Values are Date().getTime()
private readonly watchersPerVideo = new Map<number, number[]>()
private readonly segmentsSha256 = new Map<string, Map<string, string>>()
private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()

Expand Down Expand Up @@ -115,6 +117,8 @@ class LiveManager {
this.stop()
}
})

setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
}

run () {
Expand All @@ -131,6 +135,10 @@ class LiveManager {
this.rtmpServer = undefined
}

isRunning () {
return !!this.rtmpServer
}

getSegmentsSha256 (videoUUID: string) {
return this.segmentsSha256.get(videoUUID)
}
Expand All @@ -150,6 +158,19 @@ class LiveManager {
return currentLives.reduce((sum, obj) => sum + obj.size, 0)
}

addViewTo (videoId: number) {
if (this.videoSessions.has(videoId) === false) return

let watchers = this.watchersPerVideo.get(videoId)

if (!watchers) {
watchers = []
this.watchersPerVideo.set(videoId, watchers)
}

watchers.push(new Date().getTime())
}

private getContext () {
return context
}
Expand Down Expand Up @@ -331,6 +352,7 @@ class LiveManager {
logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl)

this.transSessions.delete(sessionId)
this.watchersPerVideo.delete(videoLive.videoId)

Promise.all([ tsWatcher.close(), masterWatcher.close() ])
.catch(err => logger.error('Cannot close watchers of %s.', outPath, { err }))
Expand Down Expand Up @@ -426,6 +448,32 @@ class LiveManager {
return this.isAbleToUploadVideoWithCache(user.id)
}

private async updateLiveViews () {
if (!this.isRunning()) return

logger.info('Updating live video views.')

for (const videoId of this.watchersPerVideo.keys()) {
const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE

const watchers = this.watchersPerVideo.get(videoId)

const numWatchers = watchers.length

const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
video.views = numWatchers
await video.save()

await federateVideoIfNeeded(video, false)

// Only keep not expired watchers
const newWatchers = watchers.filter(w => w > notBefore)
this.watchersPerVideo.set(videoId, newWatchers)

logger.debug('New live video views for %s is %d.', video.url, numWatchers)
}
}

static get Instance () {
return this.instance || (this.instance = new this())
}
Expand Down
10 changes: 7 additions & 3 deletions server/lib/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
USER_EMAIL_VERIFY_LIFETIME,
USER_PASSWORD_RESET_LIFETIME,
USER_PASSWORD_CREATE_LIFETIME,
VIDEO_VIEW_LIFETIME,
VIEW_LIFETIME,
WEBSERVER,
TRACKER_RATE_LIMITS
} from '../initializers/constants'
Expand Down Expand Up @@ -118,8 +118,12 @@ class Redis {

/* ************ Views per IP ************ */

setIPVideoView (ip: string, videoUUID: string) {
return this.setValue(this.generateViewKey(ip, videoUUID), '1', VIDEO_VIEW_LIFETIME)
setIPVideoView (ip: string, videoUUID: string, isLive: boolean) {
const lifetime = isLive
? VIEW_LIFETIME.LIVE
: VIEW_LIFETIME.VIDEO

return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime)
}

async doesVideoIPViewExist (ip: string, videoUUID: string) {
Expand Down
77 changes: 77 additions & 0 deletions server/tests/api/live/live.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import {
testImage,
updateCustomSubConfig,
updateLive,
viewVideo,
wait,
waitJobs,
waitUntilLiveStarts
} from '../../../../shared/extra-utils'
import { FfmpegCommand } from 'fluent-ffmpeg'

const expect = chai.expect

Expand Down Expand Up @@ -419,6 +422,80 @@ describe('Test live', function () {
})
})

describe('Live views', function () {
let liveVideoId: string
let command: FfmpegCommand

async function countViews (expected: number) {
for (const server of servers) {
const res = await getVideo(server.url, liveVideoId)
const video: VideoDetails = res.body

expect(video.views).to.equal(expected)
}
}

before(async function () {
this.timeout(30000)

const liveAttributes = {
name: 'live video',
channelId: servers[0].videoChannel.id,
privacy: VideoPrivacy.PUBLIC
}

const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes)
liveVideoId = res.body.video.uuid

command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId)
await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoId)
await waitJobs(servers)
})

it('Should display no views for a live', async function () {
await countViews(0)
})

it('Should view a live twice and display 1 view', async function () {
this.timeout(30000)

await viewVideo(servers[0].url, liveVideoId)
await viewVideo(servers[0].url, liveVideoId)

await wait(5000)

await waitJobs(servers)

await countViews(1)
})

it('Should wait 5 seconds and display 0 views', async function () {
this.timeout(30000)

await wait(5000)
await waitJobs(servers)

await countViews(0)
})

it('Should view a live on a remote and on local and display 2 views', async function () {
this.timeout(30000)

await viewVideo(servers[0].url, liveVideoId)
await viewVideo(servers[1].url, liveVideoId)
await viewVideo(servers[1].url, liveVideoId)

await wait(5000)
await waitJobs(servers)

await countViews(2)
})

after(async function () {
await stopFfmpeg(command)
})
})

describe('Live socket messages', function () {

async function createLiveWrapper () {
Expand Down

0 comments on commit cfe3d3a

Please sign in to comment.