Skip to content

Commit

Permalink
Fix delivery of onLoadCanceled to occur before thread dies.
Browse files Browse the repository at this point in the history
This removes "message sent on dead thread" warnings in nearly
all cases, and guarantees delivery of load cancelation to event
listeners.

Issue: #426
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=123957691
  • Loading branch information
ojw28 committed Jun 15, 2016
1 parent 8e717e2 commit 7ef028c
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ public void seekToUs(long positionUs) {

@Override
public void release() {
streamState = STREAM_STATE_END_OF_STREAM;
loader.release();
}

Expand Down Expand Up @@ -221,8 +220,10 @@ public void onLoadCompleted(SingleSampleSource loadable, long elapsedMs) {
}

@Override
public void onLoadCanceled(SingleSampleSource loadable, long elapsedMs) {
maybeStartLoading();
public void onLoadCanceled(SingleSampleSource loadable, long elapsedMs, boolean released) {
if (!released) {
maybeStartLoading();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class ChunkTrackStream<T extends ChunkSource> implements TrackStream,
private long pendingResetPositionUs;

private boolean loadingFinished;
private boolean released;

/**
* @param chunkSource A {@link ChunkSource} from which chunks to load are obtained.
Expand Down Expand Up @@ -233,7 +232,7 @@ public void onLoadCompleted(Chunk loadable, long elapsedMs) {
}

@Override
public void onLoadCanceled(Chunk loadable, long elapsedMs) {
public void onLoadCanceled(Chunk loadable, long elapsedMs, boolean released) {
eventDispatcher.loadCanceled(loadable.bytesLoaded());
if (!released) {
restartFrom(pendingResetPositionUs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ public void onLoadCompleted(UriLoadable<MediaPresentationDescription> loadable,
}

@Override
public void onLoadCanceled(UriLoadable<MediaPresentationDescription> loadable, long elapsedMs) {
public void onLoadCanceled(UriLoadable<MediaPresentationDescription> loadable, long elapsedMs,
boolean released) {
// Do nothing.
}

Expand All @@ -380,7 +381,7 @@ public void onLoadCompleted(UriLoadable<Long> loadable, long elapsedMs) {
}

@Override
public void onLoadCanceled(UriLoadable<Long> loadable, long elapsedMs) {
public void onLoadCanceled(UriLoadable<Long> loadable, long elapsedMs, boolean released) {
// Do nothing.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,6 @@ public void release() {
for (DefaultTrackOutput sampleQueue : sampleQueues) {
sampleQueue.disable();
}
enabledTrackCount = 0;
loader.release();
}

Expand Down Expand Up @@ -510,9 +509,9 @@ public void onLoadCompleted(ExtractingLoadable loadable, long elapsedMs) {
}

@Override
public void onLoadCanceled(ExtractingLoadable loadable, long elapsedMs) {
public void onLoadCanceled(ExtractingLoadable loadable, long elapsedMs, boolean released) {
copyLengthFromLoader(loadable);
if (enabledTrackCount > 0) {
if (!released && enabledTrackCount > 0) {
restartFrom(pendingResetPositionUs);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public void onLoadCompleted(UriLoadable<HlsPlaylist> loadable, long elapsedMs) {
}

@Override
public void onLoadCanceled(UriLoadable<HlsPlaylist> loadable, long elapsedMs) {
public void onLoadCanceled(UriLoadable<HlsPlaylist> loadable, long elapsedMs, boolean released) {
// Do nothing.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ public void release() {
sampleQueues.valueAt(i).disable();
}
if (enabledTrackCount > 0) {
enabledTrackCount = 0;
loadControl.unregister(this);
}
loader.release();
Expand Down Expand Up @@ -331,9 +330,9 @@ public void onLoadCompleted(Chunk loadable, long elapsedMs) {
}

@Override
public void onLoadCanceled(Chunk loadable, long elapsedMs) {
public void onLoadCanceled(Chunk loadable, long elapsedMs, boolean released) {
eventDispatcher.loadCanceled(loadable.bytesLoaded());
if (enabledTrackCount > 0) {
if (!released && enabledTrackCount > 0) {
restartFrom(pendingResetPositionUs);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ public void onLoadCompleted(UriLoadable<SmoothStreamingManifest> loadable, long
}

@Override
public void onLoadCanceled(UriLoadable<SmoothStreamingManifest> loadable, long elapsedMs) {
public void onLoadCanceled(UriLoadable<SmoothStreamingManifest> loadable, long elapsedMs,
boolean released) {
// Do nothing.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,24 @@ public interface Callback<T extends Loadable> {

/**
* Invoked when a load has been canceled.
* <p>
* If the {@link Loader} has not been released then there is guaranteed to exist a memory
* barrier between {@link Loadable#load()} exiting and this callback being invoked. If the
* {@link Loader} has been released then this callback may be invoked before
* {@link Loadable#load()} exits.
*
* @param loadable The loadable whose load has been canceled.
* @param elapsedMs The elapsed time in milliseconds since loading started.
* @param released True if the load was canceled because the {@link Loader} was released. False
* otherwise.
*/
void onLoadCanceled(T loadable, long elapsedMs);
void onLoadCanceled(T loadable, long elapsedMs, boolean released);

/**
* Invoked when a load has completed.
* <p>
* There is guaranteed to exist a memory barrier between {@link Loadable#load()} exiting and
* this callback being invoked.
*
* @param loadable The loadable whose load has completed.
* @param elapsedMs The elapsed time in milliseconds since loading started.
Expand All @@ -95,6 +105,9 @@ public interface Callback<T extends Loadable> {

/**
* Invoked when a load encounters an error.
* <p>
* There is guaranteed to exist a memory barrier between {@link Loadable#load()} exiting and
* this callback being invoked.
*
* @param loadable The loadable whose load has encountered an error.
* @param elapsedMs The elapsed time in milliseconds since loading started.
Expand Down Expand Up @@ -179,7 +192,7 @@ public void maybeThrowError() throws IOException {
* This method should only be called when a load is in progress.
*/
public void cancelLoading() {
currentTask.cancel();
currentTask.cancel(false);
}

/**
Expand All @@ -189,7 +202,7 @@ public void cancelLoading() {
*/
public void release() {
if (currentTask != null) {
cancelLoading();
currentTask.cancel(true);
}
downloadExecutorService.shutdown();
}
Expand All @@ -208,6 +221,7 @@ private final class LoadTask<T extends Loadable> extends Handler implements Runn
private int errorCount;

private volatile Thread executorThread;
private volatile boolean released;

public LoadTask(Looper looper, T loadable, Loader.Callback<T> callback, int minRetryCount) {
super(looper);
Expand All @@ -233,17 +247,24 @@ public void start(long delayMillis) {
}
}

public void cancel() {
public void cancel(boolean released) {
this.released = released;
currentError = null;
if (hasMessages(MSG_START)) {
removeMessages(MSG_START);
sendEmptyMessage(MSG_CANCEL);
if (!released) {
sendEmptyMessage(MSG_CANCEL);
}
} else {
loadable.cancelLoad();
if (executorThread != null) {
executorThread.interrupt();
}
}
if (released) {
finish();
callback.onLoadCanceled(loadable, SystemClock.elapsedRealtime() - startTimeMs, true);
}
}

@Override
Expand All @@ -258,29 +279,42 @@ public void run() {
TraceUtil.endSection();
}
}
sendEmptyMessage(MSG_END_OF_SOURCE);
if (!released) {
sendEmptyMessage(MSG_END_OF_SOURCE);
}
} catch (IOException e) {
obtainMessage(MSG_IO_EXCEPTION, e).sendToTarget();
if (!released) {
obtainMessage(MSG_IO_EXCEPTION, e).sendToTarget();
}
} catch (InterruptedException e) {
// The load was canceled.
Assertions.checkState(loadable.isLoadCanceled());
sendEmptyMessage(MSG_END_OF_SOURCE);
if (!released) {
sendEmptyMessage(MSG_END_OF_SOURCE);
}
} catch (Exception e) {
// This should never happen, but handle it anyway.
Log.e(TAG, "Unexpected exception loading stream", e);
obtainMessage(MSG_IO_EXCEPTION, new UnexpectedLoaderException(e)).sendToTarget();
if (!released) {
obtainMessage(MSG_IO_EXCEPTION, new UnexpectedLoaderException(e)).sendToTarget();
}
} catch (Error e) {
// We'd hope that the platform would kill the process if an Error is thrown here, but the
// executor may catch the error (b/20616433). Throw it here, but also pass and throw it from
// the handler thread so that the process dies even if the executor behaves in this way.
Log.e(TAG, "Unexpected error loading stream", e);
obtainMessage(MSG_FATAL_ERROR, e).sendToTarget();
if (!released) {
obtainMessage(MSG_FATAL_ERROR, e).sendToTarget();
}
throw e;
}
}

@Override
public void handleMessage(Message msg) {
if (released) {
return;
}
if (msg.what == MSG_START) {
submitToExecutor();
return;
Expand All @@ -291,12 +325,12 @@ public void handleMessage(Message msg) {
finish();
long elapsedMs = SystemClock.elapsedRealtime() - startTimeMs;
if (loadable.isLoadCanceled()) {
callback.onLoadCanceled(loadable, elapsedMs);
callback.onLoadCanceled(loadable, elapsedMs, false);
return;
}
switch (msg.what) {
case MSG_CANCEL:
callback.onLoadCanceled(loadable, elapsedMs);
callback.onLoadCanceled(loadable, elapsedMs, false);
break;
case MSG_END_OF_SOURCE:
callback.onLoadCompleted(loadable, elapsedMs);
Expand Down

0 comments on commit 7ef028c

Please sign in to comment.