Skip to content

Commit

Permalink
fix: implement "slot pad" mode in random slots (#1000)
Browse files Browse the repository at this point in the history
This enables more tight packing of random slots (i.e. moderate length
slots w/ short shows) by aligngin slots to pad times, instead of
individual episodes within a schedule

This also includes a host of other fixes and improvements for the random
slot algorithm, UI, and slots in general. It also changes the
implementation of random slots to use a generator function - this is a
trial usage
  • Loading branch information
chrisbenincasa authored Dec 5, 2024
1 parent aeee37b commit ec8b943
Show file tree
Hide file tree
Showing 14 changed files with 537 additions and 325 deletions.
28 changes: 20 additions & 8 deletions server/src/db/ChannelDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { typedProperty } from '@/types/path.ts';
import { Result } from '@/types/result.ts';
import { MarkNullable, Maybe } from '@/types/util.ts';
import { asyncPool } from '@/util/asyncPool.ts';
import dayjs from '@/util/dayjs.ts';
import { fileExists } from '@/util/fsUtil.ts';
import { LoggerFactory } from '@/util/logging/LoggerFactory.ts';
import { MutexMap } from '@/util/mutexMap.ts';
Expand All @@ -22,8 +23,6 @@ import {
Watermark,
} from '@tunarr/types';
import { UpdateChannelProgrammingRequest } from '@tunarr/types/api';
import dayjs from 'dayjs';
import duration from 'dayjs/plugin/duration.js';
import { jsonArrayFrom } from 'kysely/helpers/sqlite';
import {
chunk,
Expand Down Expand Up @@ -100,8 +99,6 @@ import { programExternalIdString } from './schema/Program.ts';
import { ChannelTranscodingSettings } from './schema/base.ts';
import { ChannelWithPrograms as RawChannelWithPrograms } from './schema/derivedTypes.js';

dayjs.extend(duration);

// We use this to chunk super huge channel / program relation updates because
// of the way that mikro-orm generates these (e.g. "delete from XYZ where () or () ...").
// When updating a _huge_ channel, we hit internal sqlite limits, so we must chunk these
Expand Down Expand Up @@ -769,10 +766,25 @@ export class ChannelDB {
// programs: req.body.programs,
// },
// ),
const { programs, startTime } =
req.type === 'time'
? await scheduleTimeSlots(req.schedule, req.programs)
: await scheduleRandomSlots(req.schedule, req.programs);
let programs: ChannelProgram[];
let startTime: number;
if (req.type === 'time') {
({ programs, startTime } = await scheduleTimeSlots(
req.schedule,
req.programs,
));
} else {
const start = dayjs.tz();
startTime = +start;
programs = [];
for await (const p of scheduleRandomSlots(
req.schedule,
req.programs,
start,
)) {
programs.push(p);
}
}

const newLineup = await createNewLineup(programs);

Expand Down
4 changes: 4 additions & 0 deletions server/src/util/dayjs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import dayjs from 'dayjs';
import duration from 'dayjs/plugin/duration.js';
import timezone from 'dayjs/plugin/timezone.js';
import utc from 'dayjs/plugin/utc.js';

dayjs.extend(duration);
dayjs.extend(timezone);
dayjs.extend(utc);

export default dayjs;
2 changes: 1 addition & 1 deletion shared/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"bundle": "tsup",
"build": "tsup --dts",
"clean": "rimraf ./build/",
"dev": "tsup --watch",
"dev": "tsup --watch --dts",
"test": "vitest"
},
"dependencies": {
Expand Down
177 changes: 107 additions & 70 deletions shared/src/services/randomSlotsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {
last,
map,
reject,
slice,
sortBy,
} from 'lodash-es';
import { MersenneTwister19937, Random } from 'random-js';
Expand All @@ -40,35 +39,37 @@ export type PaddedProgram = {

// Adds flex time to the end of a programs array.
// If the final program is flex itself, just extends it
// Returns a new array and amount to increment the cursor
function pushOrExtendFlex(
lineup: ChannelProgram[],
flexDuration: Duration,
): [number, ChannelProgram[]] {
const durationMs = flexDuration.asMilliseconds();
if (durationMs <= 0) {
return [0, lineup];
}

const lastLineupItem = last(lineup);
if (lastLineupItem && isFlexProgram(lastLineupItem)) {
const newDuration = lastLineupItem.duration + durationMs;
const newItem: FlexProgram = {
type: 'flex',
duration: newDuration,
persisted: false,
};
return [durationMs, [...slice(lineup, 0, lineup.length - 1), newItem]];
}

const newItem: FlexProgram = {
type: 'flex',
persisted: false,
duration: durationMs,
};

return [durationMs, [...lineup, newItem]];
}
// Mutates the lineup array
// function pushOrExtendFlex(
// lineup: ChannelProgram[],
// flexDuration: Duration,
// ): [number, ChannelProgram[]] {
// const durationMs = flexDuration.asMilliseconds();
// if (durationMs <= 0) {
// return [0, lineup];
// }

// const lastLineupItem = last(lineup);
// if (lastLineupItem && isFlexProgram(lastLineupItem)) {
// const newDuration = lastLineupItem.duration + durationMs;
// const newItem: FlexProgram = {
// type: 'flex',
// duration: newDuration,
// persisted: false,
// };
// lineup[lineup.length - 1] = newItem;
// return [durationMs, lineup];
// }

// const newItem: FlexProgram = {
// type: 'flex',
// persisted: false,
// duration: durationMs,
// };

// lineup.push(newItem);
// return [durationMs, lineup];
// }

function createPaddedProgram(program: ChannelProgram, padMs: number) {
const rem = program.duration % padMs;
Expand Down Expand Up @@ -115,11 +116,20 @@ export function distributeFlex(
});
}

const createFlex = function (flexDuration: Duration) {
return {
type: 'flex',
persisted: false,
duration: +flexDuration,
} satisfies FlexProgram;
};

// eslint-disable-next-line @typescript-eslint/require-await
export async function scheduleRandomSlots(
export async function* scheduleRandomSlots(
schedule: RandomSlotSchedule,
channelProgramming: ChannelProgram[],
) {
startTime: dayjs.Dayjs = dayjs.tz(),
): AsyncGenerator<ChannelProgram> {
// Load programs
// TODO include custom programs!
const allPrograms = reject(channelProgramming, (p) => isFlexProgram(p));
Expand All @@ -129,29 +139,28 @@ export async function scheduleRandomSlots(
programBySlotType,
);

const now = dayjs.tz();
const t0 = now;
const t0 = startTime;
const upperLimit = t0.add(schedule.maxDays + 1, 'day');

let timeCursor = t0;
let channelPrograms: ChannelProgram[] = [];

const pushFlex = (flexDuration: Duration) => {
const [inc, newPrograms] = pushOrExtendFlex(channelPrograms, flexDuration);
timeCursor = timeCursor.add(inc);
channelPrograms = newPrograms;
const advanceTime = (by: number | Duration) => {
timeCursor = dayjs.isDuration(by) ? timeCursor.add(by) : timeCursor.add(by);
};

const slotsLastPlayedMap: Record<number, number> = {};

while (timeCursor.isBefore(upperLimit)) {
// await flushEventLoop();
let currSlot: RandomSlot | null = null;
let remaining: number = 0;

// Pad time
const m = timeCursor.mod(schedule.padMs).asMilliseconds();
const m = +timeCursor.mod(schedule.padMs);
if (m > constants.SLACK && schedule.padMs - m > constants.SLACK) {
pushFlex(dayjs.duration(schedule.padMs - m));
const duration = dayjs.duration(schedule.padMs - m);
yield createFlex(duration);
advanceTime(duration);
continue;
}

Expand All @@ -175,41 +184,46 @@ export async function scheduleRandomSlots(

if (random.bool(slot.weight, n)) {
currSlot = slot;
// slotIndex = i;
remaining = slot.durationMs;
}
}

if (isNull(currSlot)) {
const duration = dayjs.duration(+minNextTime.subtract(+timeCursor));
pushFlex(
// Weird
duration,
);
timeCursor = timeCursor.add(duration);
yield createFlex(duration);
advanceTime(duration);
continue;
}

const program = getNextProgramForSlot(
let program = getNextProgramForSlot(
currSlot,
programmingIteratorsById,
remaining,
);

if (isNull(program) || isFlexProgram(program)) {
pushFlex(dayjs.duration(remaining));
yield createFlex(dayjs.duration(remaining));
advanceTime(remaining);
continue;
}

// HACK
if (program.type === 'redirect') {
program = { ...program, duration: remaining };
}

// Program longer than we have left? Add it and move on...
if (program && program.duration > remaining) {
channelPrograms.push(program);
yield program;
advanceIterator(currSlot, programmingIteratorsById);
timeCursor = timeCursor.add(program.duration);
advanceTime(program.duration);
continue;
}

const paddedProgram = createPaddedProgram(program, schedule.padMs);
const paddedProgram = createPaddedProgram(
program,
schedule.padStyle === 'slot' ? 1 : schedule.padMs,
);
let totalDuration = paddedProgram.totalDuration;
advanceIterator(currSlot, programmingIteratorsById);
const paddedPrograms: PaddedProgram[] = [paddedProgram];
Expand All @@ -224,21 +238,25 @@ export async function scheduleRandomSlots(
if (totalDuration + nextProgram.duration > remaining) {
break;
}
const nextPadded = createPaddedProgram(nextProgram, schedule.padMs);
const nextPadded = createPaddedProgram(
nextProgram,
schedule.padStyle === 'slot' ? 1 : schedule.padMs,
);
paddedPrograms.push(nextPadded);
advanceIterator(currSlot, programmingIteratorsById);
totalDuration += nextPadded.totalDuration;
}

let remainingTimeInSlot = 0;

// Decipher this...
const temt = timeCursor
.add(totalDuration)
.mod(schedule.padMs)
.asMilliseconds();
if (temt >= constants.SLACK && temt < schedule.padMs - constants.SLACK) {
remainingTimeInSlot = schedule.padMs - temt;
const startOfNextBlock = +timeCursor.add(totalDuration);
// .mod(schedule.padMs)
// .asMilliseconds();
if (
startOfNextBlock % schedule.padMs >= constants.SLACK &&
startOfNextBlock % schedule.padMs < schedule.padMs - constants.SLACK
) {
remainingTimeInSlot =
schedule.padMs - (startOfNextBlock % schedule.padMs);
}

// We have two options here if there is remaining time in the slot
Expand Down Expand Up @@ -266,15 +284,34 @@ export async function scheduleRandomSlots(
lastProgram.totalDuration += remainingTimeInSlot;
}

forEach(paddedPrograms, ({ program, padMs }) => {
channelPrograms.push(program);
timeCursor = timeCursor.add(program.duration);
pushFlex(dayjs.duration(padMs));
});
let done = false;
for (const { program, padMs } of paddedPrograms) {
if (+timeCursor + program.duration > +upperLimit) {
done = true;
break;
}
yield program;
advanceTime(program.duration);
if (+timeCursor + padMs > +upperLimit) {
done = true;
break;
}
yield createFlex(dayjs.duration(padMs));
advanceTime(padMs);
}

if (done) {
break;
}
// forEach(paddedPrograms, ({ program, padMs }) => {
// channelPrograms.push(program);
// timeCursor = timeCursor.add(program.duration);
// yield* pushFlex(dayjs.duration(padMs));
// });
}

return {
programs: channelPrograms,
startTime: +t0,
};
// return {
// programs: channelPrograms,
// startTime: +t0,
// };
}
9 changes: 9 additions & 0 deletions shared/src/services/slotSchedulerUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ export function createProgramIterators(
const program = first(programBySlotType.redirect[slotId] ?? []);
if (program) {
acc[id] = new StaticProgramIterator(program);
} else {
acc[id] = new StaticProgramIterator({
type: 'redirect',
channel: slot.programming.channelId,
channelName: slot.programming.channelName ?? '',
channelNumber: -1,
duration: 1,
persisted: false,
});
}
} else if (slot.programming.type === 'custom-show') {
acc[id] =
Expand Down
12 changes: 9 additions & 3 deletions shared/src/util/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
export { mod as dayjsMod } from './dayjsExtensions.js';
export * from './plexSearchUtil.js';
export * as seq from './seq.js';
import { ChannelProgram } from '@tunarr/types';
import { PlexMedia } from '@tunarr/types/plex';
import { isNull } from 'lodash-es';
import isFunction from 'lodash-es/isFunction.js';
import { MarkRequired } from 'ts-essentials';
import type { PerTypeCallback } from '../types/index.js';
import { isNull } from 'lodash-es';
export { mod as dayjsMod } from './dayjsExtensions.js';
export * as seq from './seq.js';

export function applyOrValueNoRest<Super, X extends Super, T>(
f: ((m: X) => T) | T,
Expand Down Expand Up @@ -119,3 +119,9 @@ export function nullToUndefined<T>(x: T | null | undefined): T | undefined {
}
return x;
}

export const flushEventLoop = async () => {
return new Promise((resolve) => {
setTimeout(resolve, 0);
});
};
1 change: 1 addition & 0 deletions types/src/api/Scheduling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const FlexProgrammingSlotSchema = z.object({
const RedirectProgrammingSlotSchema = z.object({
type: z.literal('redirect'),
channelId: z.string(),
channelName: z.string().optional(),
});

const CustomShowProgrammingSlotSchema = z.object({
Expand Down
Loading

0 comments on commit ec8b943

Please sign in to comment.