Skip to content

Commit

Permalink
Compositor: Add support for mismatched timestamps.
Browse files Browse the repository at this point in the history
This means we now require 2+ input frames per input, and compare the primary
stream timestamp with secondary stream timestamps in order to select the
correct output timestamp. We also must release frames and back-pressure as
soon as possible to avoid blocking upstream VFPs.

Also, improve signalling of VFP onReadyToAcceptInputFrame

PiperOrigin-RevId: 553448965
  • Loading branch information
dway123 authored and tianyif committed Aug 7, 2023
1 parent 10f5890 commit 97bfbc4
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static com.google.android.exoplayer2.util.Assertions.checkNotNull;
import static com.google.android.exoplayer2.util.Assertions.checkState;
import static java.lang.Math.abs;
import static java.lang.Math.max;

import android.content.Context;
import android.opengl.EGLContext;
Expand All @@ -34,9 +36,12 @@
import com.google.android.exoplayer2.util.Log;
import com.google.android.exoplayer2.util.Util;
import com.google.android.exoplayer2.util.VideoFrameProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
Expand All @@ -57,10 +62,11 @@
@Deprecated
public final class DefaultVideoCompositor implements VideoCompositor {
// TODO: b/262694346 - Flesh out this implementation by doing the following:
// * Handle mismatched timestamps
// * Use a lock to synchronize inputFrameInfos more narrowly, to reduce blocking.
// * If the primary stream ends, consider setting the secondary stream as the new primary stream,
// so that secondary stream frames aren't dropped.
// * Consider adding info about the timestamps for each input frame used to composite an output
// frame, to aid debugging and testing.

private static final String THREAD_NAME = "Effect:DefaultVideoCompositor:GlThread";
private static final String TAG = "DefaultVideoCompositor";
Expand All @@ -77,6 +83,7 @@ public final class DefaultVideoCompositor implements VideoCompositor {
@GuardedBy("this")
private final List<InputSource> inputSources;

@GuardedBy("this")
private boolean allInputsEnded; // Whether all inputSources have signaled end of input.

private final TexturePool outputTexturePool;
Expand Down Expand Up @@ -124,6 +131,13 @@ public DefaultVideoCompositor(
videoFrameProcessingTaskExecutor.submit(this::setupGlObjects);
}

/**
* {@inheritDoc}
*
* <p>The input source must be able to have at least two {@linkplain #queueInputTexture queued
* textures} before one texture is {@linkplain
* DefaultVideoFrameProcessor.ReleaseOutputTextureCallback released}.
*/
@Override
public synchronized int registerInputSource() {
inputSources.add(new InputSource());
Expand All @@ -133,14 +147,28 @@ public synchronized int registerInputSource() {
@Override
public synchronized void signalEndOfInputSource(int inputId) {
inputSources.get(inputId).isInputEnded = true;
boolean allInputsEnded = true;
for (int i = 0; i < inputSources.size(); i++) {
if (!inputSources.get(i).isInputEnded) {
return;
allInputsEnded = false;
break;
}
}
allInputsEnded = true;

this.allInputsEnded = allInputsEnded;
if (inputSources.get(PRIMARY_INPUT_ID).frameInfos.isEmpty()) {
listener.onEnded();
if (inputId == PRIMARY_INPUT_ID) {
releaseExcessFramesInAllSecondaryStreams();
}
if (allInputsEnded) {
listener.onEnded();
return;
}
}
if (inputId != PRIMARY_INPUT_ID && inputSources.get(inputId).frameInfos.size() == 1) {
// When a secondary stream ends input, composite if there was only one pending frame in the
// stream.
videoFrameProcessingTaskExecutor.submit(this::maybeComposite);
}
}

Expand All @@ -150,10 +178,19 @@ public synchronized void queueInputTexture(
GlTextureInfo inputTexture,
long presentationTimeUs,
DefaultVideoFrameProcessor.ReleaseOutputTextureCallback releaseTextureCallback) {
checkState(!inputSources.get(inputId).isInputEnded);
InputSource inputSource = inputSources.get(inputId);
checkState(!inputSource.isInputEnded);

InputFrameInfo inputFrameInfo =
new InputFrameInfo(inputTexture, presentationTimeUs, releaseTextureCallback);
inputSources.get(inputId).frameInfos.add(inputFrameInfo);
inputSource.frameInfos.add(inputFrameInfo);

if (inputId == PRIMARY_INPUT_ID) {
releaseExcessFramesInAllSecondaryStreams();
} else {
releaseExcessFramesInSecondaryStream(inputSource);
}

videoFrameProcessingTaskExecutor.submit(this::maybeComposite);
}

Expand All @@ -167,6 +204,56 @@ public void release() {
}
}

private synchronized void releaseExcessFramesInAllSecondaryStreams() {
for (int i = 0; i < inputSources.size(); i++) {
if (i == PRIMARY_INPUT_ID) {
continue;
}
releaseExcessFramesInSecondaryStream(inputSources.get(i));
}
}

/**
* Release unneeded frames from the {@link InputSource} secondary stream.
*
* <p>After this method returns, there should be exactly zero or one frames left with a timestamp
* less than the primary stream's next timestamp that were present when the method execution
* began.
*/
private synchronized void releaseExcessFramesInSecondaryStream(InputSource secondaryInputSource) {
InputSource primaryInputSource = inputSources.get(PRIMARY_INPUT_ID);
// If the primary stream output is ended, all secondary frames can be released.
if (primaryInputSource.frameInfos.isEmpty() && primaryInputSource.isInputEnded) {
releaseFrames(
secondaryInputSource,
/* numberOfFramesToRelease= */ secondaryInputSource.frameInfos.size());
return;
}

// Release frames until the secondary stream has 0-2 frames with time <=
// nextTimestampToComposite.
@Nullable InputFrameInfo nextPrimaryFrame = primaryInputSource.frameInfos.peek();
long nextTimestampToComposite =
nextPrimaryFrame != null ? nextPrimaryFrame.presentationTimeUs : C.TIME_UNSET;

int numberOfSecondaryFramesBeforeOrAtNextTargetTimestamp =
Iterables.size(
Iterables.filter(
secondaryInputSource.frameInfos,
frame -> frame.presentationTimeUs <= nextTimestampToComposite));
releaseFrames(
secondaryInputSource,
/* numberOfFramesToRelease= */ max(
numberOfSecondaryFramesBeforeOrAtNextTargetTimestamp - 1, 0));
}

private synchronized void releaseFrames(InputSource inputSource, int numberOfFramesToRelease) {
for (int i = 0; i < numberOfFramesToRelease; i++) {
InputFrameInfo frameInfoToRelease = inputSource.frameInfos.remove();
frameInfoToRelease.releaseCallback.release(frameInfoToRelease.presentationTimeUs);
}
}

// Below methods must be called on the GL thread.
private void setupGlObjects() throws GlUtil.GlException {
eglDisplay = GlUtil.getDefaultEglDisplay();
Expand All @@ -179,15 +266,11 @@ private void setupGlObjects() throws GlUtil.GlException {

private synchronized void maybeComposite()
throws VideoFrameProcessingException, GlUtil.GlException {
if (!isReadyToComposite()) {
ImmutableList<InputFrameInfo> framesToComposite = getFramesToComposite();
if (framesToComposite.isEmpty()) {
return;
}

List<InputFrameInfo> framesToComposite = new ArrayList<>();
for (int inputId = 0; inputId < inputSources.size(); inputId++) {
framesToComposite.add(inputSources.get(inputId).frameInfos.remove());
}

ensureGlProgramConfigured();

// TODO: b/262694346 -
Expand All @@ -208,40 +291,81 @@ private synchronized void maybeComposite()
syncObjects.add(syncObject);
textureOutputListener.onTextureRendered(
outputTexture,
/* presentationTimeUs= */ framesToComposite.get(0).presentationTimeUs,
/* presentationTimeUs= */ outputPresentationTimestampUs,
this::releaseOutputFrame,
syncObject);
for (int i = 0; i < framesToComposite.size(); i++) {
InputFrameInfo inputFrameInfo = framesToComposite.get(i);
inputFrameInfo.releaseCallback.release(inputFrameInfo.presentationTimeUs);
}

InputSource primaryInputSource = inputSources.get(PRIMARY_INPUT_ID);
releaseFrames(primaryInputSource, /* numberOfFramesToRelease= */ 1);
releaseExcessFramesInAllSecondaryStreams();

if (allInputsEnded && inputSources.get(PRIMARY_INPUT_ID).frameInfos.isEmpty()) {
listener.onEnded();
}
}

private synchronized boolean isReadyToComposite() {
/**
* Checks whether {@code inputSources} is able to composite, and if so, returns a list of {@link
* InputFrameInfo}s that should be composited next.
*
* <p>The first input frame info in the list is from the the primary source. An empty list is
* returned if {@code inputSources} cannot composite now.
*/
private synchronized ImmutableList<InputFrameInfo> getFramesToComposite() {
if (outputTexturePool.freeTextureCount() == 0) {
return false;
return ImmutableList.of();
}
long compositeTimestampUs = C.TIME_UNSET;
for (int inputId = 0; inputId < inputSources.size(); inputId++) {
Queue<InputFrameInfo> inputFrameInfos = inputSources.get(inputId).frameInfos;
if (inputFrameInfos.isEmpty()) {
return false;
if (inputSources.get(inputId).frameInfos.isEmpty()) {
return ImmutableList.of();
}
}
ImmutableList.Builder<InputFrameInfo> framesToComposite = new ImmutableList.Builder<>();
InputFrameInfo primaryFrameToComposite =
inputSources.get(PRIMARY_INPUT_ID).frameInfos.element();
framesToComposite.add(primaryFrameToComposite);

long inputTimestampUs = checkNotNull(inputFrameInfos.peek()).presentationTimeUs;
for (int inputId = 0; inputId < inputSources.size(); inputId++) {
if (inputId == PRIMARY_INPUT_ID) {
compositeTimestampUs = inputTimestampUs;
continue;
}
// Select the secondary streams' frame that would be composited next. The frame selected is
// the closest-timestamp frame from the primary stream's frame, if all secondary streams have:
// 1. One or more frames, and the secondary stream has ended, or
// 2. Two or more frames, and at least one frame has timestamp greater than the target
// timestamp.
// The smaller timestamp is taken if two timestamps have the same distance from the primary.
InputSource secondaryInputSource = inputSources.get(inputId);
if (secondaryInputSource.frameInfos.size() == 1 && !secondaryInputSource.isInputEnded) {
return ImmutableList.of();
}
// TODO: b/262694346 - Allow for different frame-rates to be composited, by potentially
// dropping some frames in non-primary streams.
if (inputTimestampUs != compositeTimestampUs) {
throw new IllegalStateException("Non-matched timestamps not yet supported.");

long minTimeDiffFromPrimaryUs = Long.MAX_VALUE;
@Nullable InputFrameInfo secondaryFrameToComposite = null;
Iterator<InputFrameInfo> frameInfosIterator = secondaryInputSource.frameInfos.iterator();
while (frameInfosIterator.hasNext()) {
InputFrameInfo candidateFrame = frameInfosIterator.next();
long candidateTimestampUs = candidateFrame.presentationTimeUs;
long candidateAbsDistance =
abs(candidateTimestampUs - primaryFrameToComposite.presentationTimeUs);

if (candidateAbsDistance < minTimeDiffFromPrimaryUs) {
minTimeDiffFromPrimaryUs = candidateAbsDistance;
secondaryFrameToComposite = candidateFrame;
}

if (candidateTimestampUs > primaryFrameToComposite.presentationTimeUs
|| (!frameInfosIterator.hasNext() && secondaryInputSource.isInputEnded)) {
framesToComposite.add(checkNotNull(secondaryFrameToComposite));
break;
}
}
}
return true;
ImmutableList<InputFrameInfo> framesToCompositeList = framesToComposite.build();
if (framesToCompositeList.size() != inputSources.size()) {
return ImmutableList.of();
}
return framesToCompositeList;
}

private void releaseOutputFrame(long presentationTimeUs) {
Expand Down Expand Up @@ -299,7 +423,7 @@ private void drawFrame(
GlUtil.checkGlError();
}

private void releaseGlObjects() {
private synchronized void releaseGlObjects() {
try {
checkState(allInputsEnded);
outputTexturePool.deleteAllTextures();
Expand All @@ -320,7 +444,10 @@ private void releaseGlObjects() {

/** Holds information on an input source. */
private static final class InputSource {
// A queue of {link InputFrameInfo}s, inserted in order from lower to higher {@code
// presentationTimeUs} values.
public final Queue<InputFrameInfo> frameInfos;

public boolean isInputEnded;

public InputSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ interface OnInputStreamProcessedListener {
}

private static final String TAG = "FinalShaderWrapper";
private static final int SURFACE_INPUT_CAPACITY = 1;

private final Context context;
private final List<GlMatrixTransformation> matrixTransformations;
Expand Down Expand Up @@ -160,7 +161,13 @@ public FinalShaderProgramWrapper(
@Override
public void setInputListener(InputListener inputListener) {
this.inputListener = inputListener;
maybeOnReadyToAcceptInputFrame();
int inputCapacity =
textureOutputListener == null
? SURFACE_INPUT_CAPACITY
: outputTexturePool.freeTextureCount();
for (int i = 0; i < inputCapacity; i++) {
inputListener.onReadyToAcceptInputFrame();
}
}

@Override
Expand Down Expand Up @@ -202,6 +209,7 @@ public void queueInputFrame(
} else {
availableFrames.add(Pair.create(inputTexture, presentationTimeUs));
}
inputListener.onReadyToAcceptInputFrame();
} else {
checkState(outputTexturePool.freeTextureCount() > 0);
renderFrame(
Expand All @@ -210,7 +218,6 @@ public void queueInputFrame(
presentationTimeUs,
/* renderTimeNs= */ presentationTimeUs * 1000);
}
maybeOnReadyToAcceptInputFrame();
}

@Override
Expand All @@ -224,12 +231,13 @@ private void releaseOutputFrame(long presentationTimeUs) {
}

private void releaseOutputFrameInternal(long presentationTimeUs) throws GlUtil.GlException {
checkState(textureOutputListener != null);
while (outputTexturePool.freeTextureCount() < outputTexturePool.capacity()
&& checkNotNull(outputTextureTimestamps.peek()) <= presentationTimeUs) {
outputTexturePool.freeTexture();
outputTextureTimestamps.remove();
GlUtil.deleteSyncObject(syncObjects.remove());
maybeOnReadyToAcceptInputFrame();
inputListener.onReadyToAcceptInputFrame();
}
}

Expand Down Expand Up @@ -257,7 +265,12 @@ public void flush() {
defaultShaderProgram.flush();
}
inputListener.onFlush();
maybeOnReadyToAcceptInputFrame();
if (textureOutputListener == null) {
// TODO: b/293572152 - Add texture output flush() support, propagating the flush() signal to
// downstream components so that they can release TexturePool resources and FinalWrapper can
// call onReadyToAcceptInputFrame().
inputListener.onReadyToAcceptInputFrame();
}
}

@Override
Expand Down Expand Up @@ -316,12 +329,6 @@ public synchronized void setOutputSurfaceInfo(@Nullable SurfaceInfo outputSurfac
this.outputSurfaceInfo = outputSurfaceInfo;
}

private void maybeOnReadyToAcceptInputFrame() {
if (textureOutputListener == null || outputTexturePool.freeTextureCount() > 0) {
inputListener.onReadyToAcceptInputFrame();
}
}

private synchronized void renderFrame(
GlObjectsProvider glObjectsProvider,
GlTextureInfo inputTexture,
Expand Down
Loading

0 comments on commit 97bfbc4

Please sign in to comment.