From d88f5f47e6f45b24b9eb630f0bb006fcd9f511e1 Mon Sep 17 00:00:00 2001 From: olly Date: Tue, 26 May 2020 18:42:18 +0100 Subject: [PATCH] Turn CacheUtil into stateful CacheWriter - The new CacheWriter is simplified somewhat - Blocking on PriorityTaskManager.proceed is moved out of CacheWriter and into the Downloader tasks. This is because we want to shift only the caching parts of the Downloaders onto their Executors, whilst keeping the blocking parts on the main Downloader threads. Else we can end up "using" the Executor threads indefinitely whilst they're blocked. Issue: #5978 PiperOrigin-RevId: 313222923 --- .../offline/ProgressiveDownloader.java | 39 ++- .../exoplayer2/offline/SegmentDownloader.java | 39 ++- .../exoplayer2/upstream/cache/CacheUtil.java | 312 ------------------ .../upstream/cache/CacheWriter.java | 237 +++++++++++++ .../upstream/cache/CacheDataSourceTest.java | 44 ++- ...acheUtilTest.java => CacheWriterTest.java} | 118 +++++-- 6 files changed, 422 insertions(+), 367 deletions(-) delete mode 100644 library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheUtil.java create mode 100644 library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheWriter.java rename library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/{CacheUtilTest.java => CacheWriterTest.java} (71%) diff --git a/library/core/src/main/java/com/google/android/exoplayer2/offline/ProgressiveDownloader.java b/library/core/src/main/java/com/google/android/exoplayer2/offline/ProgressiveDownloader.java index 434ca8fd5d7..42e2c7e84d2 100644 --- a/library/core/src/main/java/com/google/android/exoplayer2/offline/ProgressiveDownloader.java +++ b/library/core/src/main/java/com/google/android/exoplayer2/offline/ProgressiveDownloader.java @@ -21,9 +21,11 @@ import com.google.android.exoplayer2.MediaItem; import com.google.android.exoplayer2.upstream.DataSpec; import com.google.android.exoplayer2.upstream.cache.CacheDataSource; -import com.google.android.exoplayer2.upstream.cache.CacheUtil; +import com.google.android.exoplayer2.upstream.cache.CacheWriter; +import com.google.android.exoplayer2.upstream.cache.CacheWriter.ProgressListener; import com.google.android.exoplayer2.util.Assertions; import com.google.android.exoplayer2.util.PriorityTaskManager; +import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowException; import java.io.IOException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -31,8 +33,6 @@ /** A downloader for progressive media streams. */ public final class ProgressiveDownloader implements Downloader { - private static final int BUFFER_SIZE_BYTES = 128 * 1024; - private final DataSpec dataSpec; private final CacheDataSource dataSource; private final AtomicBoolean isCanceled; @@ -104,18 +104,35 @@ public void download(@Nullable ProgressListener progressListener) throws IOExcep if (isCanceled.get()) { return; } + + CacheWriter cacheWriter = + new CacheWriter( + dataSource, + dataSpec, + /* allowShortContent= */ false, + isCanceled, + /* temporaryBuffer= */ null, + progressListener == null ? null : new ProgressForwarder(progressListener)); + @Nullable PriorityTaskManager priorityTaskManager = dataSource.getUpstreamPriorityTaskManager(); if (priorityTaskManager != null) { priorityTaskManager.add(C.PRIORITY_DOWNLOAD); } try { - CacheUtil.cache( - dataSource, - dataSpec, - progressListener == null ? null : new ProgressForwarder(progressListener), - isCanceled, - /* enableEOFException= */ true, - /* temporaryBuffer= */ new byte[BUFFER_SIZE_BYTES]); + boolean finished = false; + while (!finished && !isCanceled.get()) { + if (priorityTaskManager != null) { + priorityTaskManager.proceed(dataSource.getUpstreamPriority()); + } + try { + cacheWriter.cache(); + finished = true; + } catch (PriorityTooLowException e) { + // The next loop iteration will block until the task is able to proceed. + } + } + } catch (InterruptedException e) { + // The download was canceled. } finally { if (priorityTaskManager != null) { priorityTaskManager.remove(C.PRIORITY_DOWNLOAD); @@ -137,7 +154,7 @@ public void remove() { dataSource.getCache().removeResource(dataSource.getCacheKeyFactory().buildCacheKey(dataSpec)); } - private static final class ProgressForwarder implements CacheUtil.ProgressListener { + private static final class ProgressForwarder implements CacheWriter.ProgressListener { private final ProgressListener progressListener; diff --git a/library/core/src/main/java/com/google/android/exoplayer2/offline/SegmentDownloader.java b/library/core/src/main/java/com/google/android/exoplayer2/offline/SegmentDownloader.java index 02337248e15..28ed994168d 100644 --- a/library/core/src/main/java/com/google/android/exoplayer2/offline/SegmentDownloader.java +++ b/library/core/src/main/java/com/google/android/exoplayer2/offline/SegmentDownloader.java @@ -28,10 +28,11 @@ import com.google.android.exoplayer2.upstream.cache.Cache; import com.google.android.exoplayer2.upstream.cache.CacheDataSource; import com.google.android.exoplayer2.upstream.cache.CacheKeyFactory; -import com.google.android.exoplayer2.upstream.cache.CacheUtil; +import com.google.android.exoplayer2.upstream.cache.CacheWriter; import com.google.android.exoplayer2.upstream.cache.ContentMetadata; import com.google.android.exoplayer2.util.Assertions; import com.google.android.exoplayer2.util.PriorityTaskManager; +import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowException; import com.google.android.exoplayer2.util.Util; import java.io.IOException; import java.util.ArrayList; @@ -175,18 +176,32 @@ public final void download(@Nullable ProgressListener progressListener) throws I segmentsDownloaded) : null; byte[] temporaryBuffer = new byte[BUFFER_SIZE_BYTES]; - for (int i = 0; i < segments.size(); i++) { - CacheUtil.cache( - dataSource, - segments.get(i).dataSpec, - progressNotifier, - isCanceled, - /* enableEOFException= */ true, - temporaryBuffer); - if (progressNotifier != null) { - progressNotifier.onSegmentDownloaded(); + int segmentIndex = 0; + while (!isCanceled.get() && segmentIndex < segments.size()) { + if (priorityTaskManager != null) { + priorityTaskManager.proceed(dataSource.getUpstreamPriority()); + } + CacheWriter cacheWriter = + new CacheWriter( + dataSource, + segments.get(segmentIndex).dataSpec, + /* allowShortContent= */ false, + isCanceled, + temporaryBuffer, + progressNotifier); + try { + cacheWriter.cache(); + segmentIndex++; + if (progressNotifier != null) { + progressNotifier.onSegmentDownloaded(); + } + } catch (PriorityTooLowException e) { + // The next loop iteration will block until the task is able to proceed, then try and + // download the same segment again. } } + } catch (InterruptedException e) { + // The download was canceled. } finally { if (priorityTaskManager != null) { priorityTaskManager.remove(C.PRIORITY_DOWNLOAD); @@ -293,7 +308,7 @@ private static boolean canMergeSegments(DataSpec dataSpec1, DataSpec dataSpec2) && dataSpec1.httpRequestHeaders.equals(dataSpec2.httpRequestHeaders); } - private static final class ProgressNotifier implements CacheUtil.ProgressListener { + private static final class ProgressNotifier implements CacheWriter.ProgressListener { private final ProgressListener progressListener; diff --git a/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheUtil.java b/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheUtil.java deleted file mode 100644 index 1e850df278c..00000000000 --- a/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheUtil.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Copyright (C) 2017 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.android.exoplayer2.upstream.cache; - -import androidx.annotation.Nullable; -import androidx.annotation.WorkerThread; -import com.google.android.exoplayer2.C; -import com.google.android.exoplayer2.upstream.DataSource; -import com.google.android.exoplayer2.upstream.DataSourceException; -import com.google.android.exoplayer2.upstream.DataSpec; -import com.google.android.exoplayer2.util.Assertions; -import com.google.android.exoplayer2.util.PriorityTaskManager; -import com.google.android.exoplayer2.util.Util; -import java.io.EOFException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Caching related utility methods. - */ -public final class CacheUtil { - - /** Receives progress updates during cache operations. */ - public interface ProgressListener { - - /** - * Called when progress is made during a cache operation. - * - * @param requestLength The length of the content being cached in bytes, or {@link - * C#LENGTH_UNSET} if unknown. - * @param bytesCached The number of bytes that are cached. - * @param newBytesCached The number of bytes that have been newly cached since the last progress - * update. - */ - void onProgress(long requestLength, long bytesCached, long newBytesCached); - } - - /** Default buffer size to be used while caching. */ - public static final int DEFAULT_BUFFER_SIZE_BYTES = 128 * 1024; - - /** @deprecated Use {@link CacheKeyFactory#DEFAULT}. */ - @Deprecated - public static final CacheKeyFactory DEFAULT_CACHE_KEY_FACTORY = CacheKeyFactory.DEFAULT; - - /** - * Caches the data defined by {@code dataSpec}, skipping already cached data. Caching stops early - * if the end of the input is reached. - * - *

To cancel the operation, the caller should both set {@code isCanceled} to true and interrupt - * the calling thread. - * - *

This method may be slow and shouldn't normally be called on the main thread. - * - * @param cache A {@link Cache} to store the data. - * @param dataSpec Defines the data to be cached. - * @param upstreamDataSource A {@link DataSource} for reading data not in the cache. - * @param progressListener A listener to receive progress updates, or {@code null}. - * @param isCanceled An optional flag that will cancel the operation if set to true. - * @throws IOException If an error occurs caching the data, or if the operation was canceled. - */ - @WorkerThread - public static void cache( - Cache cache, - DataSpec dataSpec, - DataSource upstreamDataSource, - @Nullable ProgressListener progressListener, - @Nullable AtomicBoolean isCanceled) - throws IOException { - cache( - new CacheDataSource(cache, upstreamDataSource), - dataSpec, - progressListener, - isCanceled, - /* enableEOFException= */ false, - new byte[DEFAULT_BUFFER_SIZE_BYTES]); - } - - /** - * Caches the data defined by {@code dataSpec}, skipping already cached data. Caching stops early - * if end of input is reached and {@code enableEOFException} is false. - * - *

If {@code dataSource} has a {@link PriorityTaskManager}, then it's the responsibility of the - * calling code to call {@link PriorityTaskManager#add} to register with the manager before - * calling this method, and to call {@link PriorityTaskManager#remove} afterwards to unregister. - * - *

To cancel the operation, the caller should both set {@code isCanceled} to true and interrupt - * the calling thread. - * - *

This method may be slow and shouldn't normally be called on the main thread. - * - * @param dataSource A {@link CacheDataSource} to be used for caching the data. - * @param dataSpec Defines the data to be cached. - * @param progressListener A listener to receive progress updates, or {@code null}. - * @param isCanceled An optional flag that will cancel the operation if set to true. - * @param enableEOFException Whether to throw an {@link EOFException} if end of input has been - * reached unexpectedly. - * @param temporaryBuffer A temporary buffer to be used during caching. - * @throws IOException If an error occurs caching the data, or if the operation was canceled. - */ - @WorkerThread - public static void cache( - CacheDataSource dataSource, - DataSpec dataSpec, - @Nullable ProgressListener progressListener, - @Nullable AtomicBoolean isCanceled, - boolean enableEOFException, - byte[] temporaryBuffer) - throws IOException { - Assertions.checkNotNull(dataSource); - Assertions.checkNotNull(temporaryBuffer); - - Cache cache = dataSource.getCache(); - String cacheKey = dataSource.getCacheKeyFactory().buildCacheKey(dataSpec); - long requestLength = dataSpec.length; - if (requestLength == C.LENGTH_UNSET) { - long resourceLength = ContentMetadata.getContentLength(cache.getContentMetadata(cacheKey)); - if (resourceLength != C.LENGTH_UNSET) { - requestLength = resourceLength - dataSpec.position; - } - } - long bytesCached = cache.getCachedBytes(cacheKey, dataSpec.position, requestLength); - @Nullable ProgressNotifier progressNotifier = null; - if (progressListener != null) { - progressNotifier = new ProgressNotifier(progressListener); - progressNotifier.init(requestLength, bytesCached); - } - - long position = dataSpec.position; - long bytesLeft = requestLength; - while (bytesLeft != 0) { - throwExceptionIfCanceled(isCanceled); - long blockLength = cache.getCachedLength(cacheKey, position, bytesLeft); - if (blockLength > 0) { - // Skip already cached data. - } else { - // There is a hole in the cache which is at least "-blockLength" long. - blockLength = -blockLength; - long length = blockLength == Long.MAX_VALUE ? C.LENGTH_UNSET : blockLength; - boolean isLastBlock = length == bytesLeft; - long read = - readAndDiscard( - dataSpec, - position, - length, - dataSource, - isCanceled, - progressNotifier, - isLastBlock, - temporaryBuffer); - if (read < blockLength) { - // Reached to the end of the data. - if (enableEOFException && bytesLeft != C.LENGTH_UNSET) { - throw new EOFException(); - } - break; - } - } - position += blockLength; - if (bytesLeft != C.LENGTH_UNSET) { - bytesLeft -= blockLength; - } - } - } - - /** - * Reads and discards all data specified by the {@code dataSpec}. - * - * @param dataSpec Defines the data to be read. The {@code position} and {@code length} fields are - * overwritten by the following parameters. - * @param position The position of the data to be read. - * @param length Length of the data to be read, or {@link C#LENGTH_UNSET} if it is unknown. - * @param dataSource The {@link CacheDataSource} to read the data from. - * @param isCanceled An optional flag that will cancel the operation if set to true. - * @param progressNotifier A notifier through which to report progress updates, or {@code null}. - * @param isLastBlock Whether this read block is the last block of the content. - * @param temporaryBuffer A temporary buffer to be used during caching. - * @return Number of read bytes, or 0 if no data is available because the end of the opened range - * has been reached. - * @param isCanceled An optional flag that will cancel the operation if set to true. - */ - private static long readAndDiscard( - DataSpec dataSpec, - long position, - long length, - CacheDataSource dataSource, - @Nullable AtomicBoolean isCanceled, - @Nullable ProgressNotifier progressNotifier, - boolean isLastBlock, - byte[] temporaryBuffer) - throws IOException { - long positionOffset = position - dataSpec.position; - long initialPositionOffset = positionOffset; - long endOffset = length != C.LENGTH_UNSET ? positionOffset + length : C.POSITION_UNSET; - @Nullable PriorityTaskManager priorityTaskManager = dataSource.getUpstreamPriorityTaskManager(); - while (true) { - if (priorityTaskManager != null) { - // Wait for any other thread with higher priority to finish its job. - try { - priorityTaskManager.proceed(dataSource.getUpstreamPriority()); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - } - throwExceptionIfCanceled(isCanceled); - try { - long resolvedLength = C.LENGTH_UNSET; - boolean isDataSourceOpen = false; - if (endOffset != C.POSITION_UNSET) { - // If a specific length is given, first try to open the data source for that length to - // avoid more data then required to be requested. If the given length exceeds the end of - // input we will get a "position out of range" error. In that case try to open the source - // again with unset length. - try { - resolvedLength = - dataSource.open(dataSpec.subrange(positionOffset, endOffset - positionOffset)); - isDataSourceOpen = true; - } catch (IOException exception) { - if (!isLastBlock || !DataSourceException.isCausedByPositionOutOfRange(exception)) { - throw exception; - } - Util.closeQuietly(dataSource); - } - } - if (!isDataSourceOpen) { - resolvedLength = dataSource.open(dataSpec.subrange(positionOffset, C.LENGTH_UNSET)); - } - if (isLastBlock && progressNotifier != null && resolvedLength != C.LENGTH_UNSET) { - progressNotifier.onRequestLengthResolved(positionOffset + resolvedLength); - } - while (positionOffset != endOffset) { - throwExceptionIfCanceled(isCanceled); - int bytesRead = - dataSource.read( - temporaryBuffer, - 0, - endOffset != C.POSITION_UNSET - ? (int) Math.min(temporaryBuffer.length, endOffset - positionOffset) - : temporaryBuffer.length); - if (bytesRead == C.RESULT_END_OF_INPUT) { - if (progressNotifier != null) { - progressNotifier.onRequestLengthResolved(positionOffset); - } - break; - } - positionOffset += bytesRead; - if (progressNotifier != null) { - progressNotifier.onBytesCached(bytesRead); - } - } - return positionOffset - initialPositionOffset; - } catch (PriorityTaskManager.PriorityTooLowException exception) { - // catch and try again - } finally { - Util.closeQuietly(dataSource); - } - } - } - - private static void throwExceptionIfCanceled(@Nullable AtomicBoolean isCanceled) - throws InterruptedIOException { - if (isCanceled != null && isCanceled.get()) { - throw new InterruptedIOException(); - } - } - - private CacheUtil() {} - - private static final class ProgressNotifier { - /** The listener to notify when progress is made. */ - private final ProgressListener listener; - /** The length of the content being cached in bytes, or {@link C#LENGTH_UNSET} if unknown. */ - private long requestLength; - /** The number of bytes that are cached. */ - private long bytesCached; - - public ProgressNotifier(ProgressListener listener) { - this.listener = listener; - } - - public void init(long requestLength, long bytesCached) { - this.requestLength = requestLength; - this.bytesCached = bytesCached; - listener.onProgress(requestLength, bytesCached, /* newBytesCached= */ 0); - } - - public void onRequestLengthResolved(long requestLength) { - if (this.requestLength == C.LENGTH_UNSET && requestLength != C.LENGTH_UNSET) { - this.requestLength = requestLength; - listener.onProgress(requestLength, bytesCached, /* newBytesCached= */ 0); - } - } - - public void onBytesCached(long newBytesCached) { - bytesCached += newBytesCached; - listener.onProgress(requestLength, bytesCached, newBytesCached); - } - } -} diff --git a/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheWriter.java b/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheWriter.java new file mode 100644 index 00000000000..ee44b0dc51a --- /dev/null +++ b/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheWriter.java @@ -0,0 +1,237 @@ +/* + * Copyright (C) 2017 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.upstream.cache; + +import androidx.annotation.Nullable; +import androidx.annotation.WorkerThread; +import com.google.android.exoplayer2.C; +import com.google.android.exoplayer2.upstream.DataSourceException; +import com.google.android.exoplayer2.upstream.DataSpec; +import com.google.android.exoplayer2.util.PriorityTaskManager; +import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowException; +import com.google.android.exoplayer2.util.Util; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** Caching related utility methods. */ +public final class CacheWriter { + + /** Receives progress updates during cache operations. */ + public interface ProgressListener { + + /** + * Called when progress is made during a cache operation. + * + * @param requestLength The length of the content being cached in bytes, or {@link + * C#LENGTH_UNSET} if unknown. + * @param bytesCached The number of bytes that are cached. + * @param newBytesCached The number of bytes that have been newly cached since the last progress + * update. + */ + void onProgress(long requestLength, long bytesCached, long newBytesCached); + } + + /** Default buffer size to be used while caching. */ + public static final int DEFAULT_BUFFER_SIZE_BYTES = 128 * 1024; + + private final CacheDataSource dataSource; + private final Cache cache; + private final DataSpec dataSpec; + private final boolean allowShortContent; + private final AtomicBoolean isCanceled; + private final String cacheKey; + private final byte[] temporaryBuffer; + @Nullable private final ProgressListener progressListener; + + private boolean initialized; + private long nextPosition; + private long endPosition; + private long bytesCached; + + /** + * @param dataSource A {@link CacheDataSource} that writes to the target cache. + * @param dataSpec Defines the data to be written. + * @param allowShortContent Whether it's allowed for the content to end before the request as + * defined by the {@link DataSpec}. If {@code true} and the request exceeds the length of the + * content, then the content will be cached to the end. If {@code false} and the request + * exceeds the length of the content, {@link #cache} will throw an {@link IOException}. + * @param isCanceled An optional cancelation signal. If specified, {@link #cache} will check the + * value of this signal frequently during caching. If the value is {@code true}, the operation + * will be considered canceled and {@link #cache} will throw {@link InterruptedIOException}. + * @param temporaryBuffer A temporary buffer to be used during caching, or {@code null} if the + * writer should instantiate its own internal temporary buffer. + * @param progressListener An optional progress listener. + */ + public CacheWriter( + CacheDataSource dataSource, + DataSpec dataSpec, + boolean allowShortContent, + @Nullable AtomicBoolean isCanceled, + @Nullable byte[] temporaryBuffer, + @Nullable ProgressListener progressListener) { + this.dataSource = dataSource; + this.cache = dataSource.getCache(); + this.dataSpec = dataSpec; + this.allowShortContent = allowShortContent; + this.isCanceled = isCanceled == null ? new AtomicBoolean() : isCanceled; + this.temporaryBuffer = + temporaryBuffer == null ? new byte[DEFAULT_BUFFER_SIZE_BYTES] : temporaryBuffer; + this.progressListener = progressListener; + cacheKey = dataSource.getCacheKeyFactory().buildCacheKey(dataSpec); + nextPosition = dataSpec.position; + } + + /** + * Caches the requested data, skipping any that's already cached. + * + *

If the {@link CacheDataSource} used by the writer has a {@link PriorityTaskManager}, then + * it's the responsibility of the caller to call {@link PriorityTaskManager#add} to register with + * the manager before calling this method, and to call {@link PriorityTaskManager#remove} + * afterwards to unregister. {@link PriorityTooLowException} will be thrown if the priority + * required by the {@link CacheDataSource} is not high enough for progress to be made. + * + *

This method may be slow and shouldn't normally be called on the main thread. + * + * @throws IOException If an error occurs reading the data, or writing the data into the cache, or + * if the operation is canceled. If canceled, an {@link InterruptedIOException} is thrown. The + * method may be called again to continue the operation from where the error occurred. + */ + @WorkerThread + public void cache() throws IOException { + throwIfCanceled(); + + if (!initialized) { + if (dataSpec.length != C.LENGTH_UNSET) { + endPosition = dataSpec.position + dataSpec.length; + } else { + long contentLength = ContentMetadata.getContentLength(cache.getContentMetadata(cacheKey)); + endPosition = contentLength == C.LENGTH_UNSET ? C.POSITION_UNSET : contentLength; + } + bytesCached = cache.getCachedBytes(cacheKey, dataSpec.position, dataSpec.length); + if (progressListener != null) { + progressListener.onProgress(getLength(), bytesCached, /* newBytesCached= */ 0); + } + initialized = true; + } + + while (endPosition == C.POSITION_UNSET || nextPosition < endPosition) { + throwIfCanceled(); + long maxRemainingLength = + endPosition == C.POSITION_UNSET ? Long.MAX_VALUE : endPosition - nextPosition; + long blockLength = cache.getCachedLength(cacheKey, nextPosition, maxRemainingLength); + if (blockLength > 0) { + nextPosition += blockLength; + } else { + // There's a hole of length -blockLength. + blockLength = -blockLength; + long nextRequestLength = blockLength == Long.MAX_VALUE ? C.LENGTH_UNSET : blockLength; + nextPosition += readBlockToCache(nextPosition, nextRequestLength); + } + } + } + + /** + * Reads the specified block of data, writing it into the cache. + * + * @param position The starting position of the block. + * @param length The length of the block, or {@link C#LENGTH_UNSET} if unbounded. + * @return The number of bytes read. + * @throws IOException If an error occurs reading the data or writing it to the cache. + */ + private long readBlockToCache(long position, long length) throws IOException { + boolean isLastBlock = position + length == endPosition || length == C.LENGTH_UNSET; + try { + long resolvedLength = C.LENGTH_UNSET; + boolean isDataSourceOpen = false; + if (length != C.LENGTH_UNSET) { + // If the length is specified, try to open the data source with a bounded request to avoid + // the underlying network stack requesting more data than required. + try { + DataSpec boundedDataSpec = + dataSpec.buildUpon().setPosition(position).setLength(length).build(); + resolvedLength = dataSource.open(boundedDataSpec); + isDataSourceOpen = true; + } catch (IOException exception) { + if (allowShortContent + && isLastBlock + && DataSourceException.isCausedByPositionOutOfRange(exception)) { + // The length of the request exceeds the length of the content. If we allow shorter + // content and are reading the last block, fall through and try again with an unbounded + // request to read up to the end of the content. + Util.closeQuietly(dataSource); + } else { + throw exception; + } + } + } + if (!isDataSourceOpen) { + // Either the length was unspecified, or we allow short content and our attempt to open the + // DataSource with the specified length failed. + throwIfCanceled(); + DataSpec unboundedDataSpec = + dataSpec.buildUpon().setPosition(position).setLength(C.LENGTH_UNSET).build(); + resolvedLength = dataSource.open(unboundedDataSpec); + } + if (isLastBlock && resolvedLength != C.LENGTH_UNSET) { + onRequestEndPosition(position + resolvedLength); + } + int totalBytesRead = 0; + int bytesRead = 0; + while (bytesRead != C.RESULT_END_OF_INPUT) { + throwIfCanceled(); + bytesRead = dataSource.read(temporaryBuffer, /* offset= */ 0, temporaryBuffer.length); + if (bytesRead != C.RESULT_END_OF_INPUT) { + onNewBytesCached(bytesRead); + totalBytesRead += bytesRead; + } + } + if (isLastBlock) { + onRequestEndPosition(position + totalBytesRead); + } + return totalBytesRead; + } finally { + Util.closeQuietly(dataSource); + } + } + + private void onRequestEndPosition(long endPosition) { + if (this.endPosition == endPosition) { + return; + } + this.endPosition = endPosition; + if (progressListener != null) { + progressListener.onProgress(getLength(), bytesCached, /* newBytesCached= */ 0); + } + } + + private void onNewBytesCached(long newBytesCached) { + bytesCached += newBytesCached; + if (progressListener != null) { + progressListener.onProgress(getLength(), bytesCached, newBytesCached); + } + } + + private long getLength() { + return endPosition == C.POSITION_UNSET ? C.LENGTH_UNSET : endPosition - dataSpec.position; + } + + private void throwIfCanceled() throws InterruptedIOException { + if (isCanceled.get()) { + throw new InterruptedIOException(); + } + } +} diff --git a/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheDataSourceTest.java b/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheDataSourceTest.java index b4c259689e9..d8a7a034066 100644 --- a/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheDataSourceTest.java +++ b/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheDataSourceTest.java @@ -357,8 +357,15 @@ public void switchToCacheSourceWithReadOnlyCacheDataSource() throws Exception { .newDefaultData() .appendReadData(1024 * 1024) .endData()); - CacheUtil.cache( - cache, unboundedDataSpec, upstream2, /* progressListener= */ null, /* isCanceled= */ null); + CacheWriter cacheWriter = + new CacheWriter( + new CacheDataSource(cache, upstream2), + unboundedDataSpec, + /* allowShortContent= */ false, + /* isCanceled= */ null, + /* temporaryBuffer= */ null, + /* progressListener= */ null); + cacheWriter.cache(); // Read the rest of the data. TestUtil.readToEnd(cacheDataSource); @@ -401,8 +408,15 @@ public void switchToCacheSourceWithNonBlockingCacheDataSource() throws Exception .newDefaultData() .appendReadData(1024 * 1024) .endData()); - CacheUtil.cache( - cache, unboundedDataSpec, upstream2, /* progressListener= */ null, /* isCanceled= */ null); + CacheWriter cacheWriter = + new CacheWriter( + new CacheDataSource(cache, upstream2), + unboundedDataSpec, + /* allowShortContent= */ false, + /* isCanceled= */ null, + /* temporaryBuffer= */ null, + /* progressListener= */ null); + cacheWriter.cache(); // Read the rest of the data. TestUtil.readToEnd(cacheDataSource); @@ -420,8 +434,15 @@ public void deleteCachedWhileReadingFromUpstreamWithReadOnlyCacheDataSourceDoesN // Cache the latter half of the data. int halfDataLength = 512; DataSpec dataSpec = buildDataSpec(halfDataLength, C.LENGTH_UNSET); - CacheUtil.cache( - cache, dataSpec, upstream, /* progressListener= */ null, /* isCanceled= */ null); + CacheWriter cacheWriter = + new CacheWriter( + new CacheDataSource(cache, upstream), + dataSpec, + /* allowShortContent= */ false, + /* isCanceled= */ null, + /* temporaryBuffer= */ null, + /* progressListener= */ null); + cacheWriter.cache(); // Create cache read-only CacheDataSource. CacheDataSource cacheDataSource = @@ -451,8 +472,15 @@ public void deleteCachedWhileReadingFromUpstreamWithBlockingCacheDataSourceDoesN // Cache the latter half of the data. int halfDataLength = 512; DataSpec dataSpec = buildDataSpec(/* position= */ 0, halfDataLength); - CacheUtil.cache( - cache, dataSpec, upstream, /* progressListener= */ null, /* isCanceled= */ null); + CacheWriter cacheWriter = + new CacheWriter( + new CacheDataSource(cache, upstream), + dataSpec, + /* allowShortContent= */ false, + /* isCanceled= */ null, + /* temporaryBuffer= */ null, + /* progressListener= */ null); + cacheWriter.cache(); // Create blocking CacheDataSource. CacheDataSource cacheDataSource = diff --git a/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheUtilTest.java b/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheWriterTest.java similarity index 71% rename from library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheUtilTest.java rename to library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheWriterTest.java index cc7a232b3f5..0e977565522 100644 --- a/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheUtilTest.java +++ b/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheWriterTest.java @@ -26,10 +26,11 @@ import com.google.android.exoplayer2.testutil.FakeDataSet; import com.google.android.exoplayer2.testutil.FakeDataSource; import com.google.android.exoplayer2.testutil.TestUtil; +import com.google.android.exoplayer2.upstream.DataSourceException; import com.google.android.exoplayer2.upstream.DataSpec; import com.google.android.exoplayer2.util.Util; -import java.io.EOFException; import java.io.File; +import java.io.IOException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -38,9 +39,9 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; -/** Tests {@link CacheUtil}. */ +/** Unit tests for {@link CacheWriter}. */ @RunWith(AndroidJUnit4.class) -public final class CacheUtilTest { +public final class CacheWriterTest { /** * Abstract fake Cache implementation used by the test. This class must be public so Mockito can @@ -109,8 +110,16 @@ public void cache() throws Exception { FakeDataSource dataSource = new FakeDataSource(fakeDataSet); CachingCounters counters = new CachingCounters(); - CacheUtil.cache( - cache, new DataSpec(Uri.parse("test_data")), dataSource, counters, /* isCanceled= */ null); + + CacheWriter cacheWriter = + new CacheWriter( + new CacheDataSource(cache, dataSource), + new DataSpec(Uri.parse("test_data")), + /* allowShortContent= */ false, + /* isCanceled= */ null, + /* temporaryBuffer= */ null, + counters); + cacheWriter.cache(); counters.assertValues(0, 100, 100); assertCachedData(cache, fakeDataSet); @@ -124,12 +133,29 @@ public void cacheSetOffsetAndLength() throws Exception { Uri testUri = Uri.parse("test_data"); DataSpec dataSpec = new DataSpec(testUri, /* position= */ 10, /* length= */ 20); CachingCounters counters = new CachingCounters(); - CacheUtil.cache(cache, dataSpec, dataSource, counters, /* isCanceled= */ null); + + CacheWriter cacheWriter = + new CacheWriter( + new CacheDataSource(cache, dataSource), + dataSpec, + /* allowShortContent= */ false, + /* isCanceled= */ null, + /* temporaryBuffer= */ null, + counters); + cacheWriter.cache(); counters.assertValues(0, 20, 20); counters.reset(); - CacheUtil.cache(cache, new DataSpec(testUri), dataSource, counters, /* isCanceled= */ null); + cacheWriter = + new CacheWriter( + new CacheDataSource(cache, dataSource), + new DataSpec(testUri), + /* allowShortContent= */ false, + /* isCanceled= */ null, + /* temporaryBuffer= */ null, + counters); + cacheWriter.cache(); counters.assertValues(20, 80, 100); assertCachedData(cache, fakeDataSet); @@ -144,7 +170,16 @@ public void cacheUnknownLength() throws Exception { DataSpec dataSpec = new DataSpec(Uri.parse("test_data")); CachingCounters counters = new CachingCounters(); - CacheUtil.cache(cache, dataSpec, dataSource, counters, /* isCanceled= */ null); + + CacheWriter cacheWriter = + new CacheWriter( + new CacheDataSource(cache, dataSource), + dataSpec, + /* allowShortContent= */ false, + /* isCanceled= */ null, + /* temporaryBuffer= */ null, + counters); + cacheWriter.cache(); counters.assertValues(0, 100, 100); assertCachedData(cache, fakeDataSet); @@ -160,12 +195,29 @@ public void cacheUnknownLengthPartialCaching() throws Exception { Uri testUri = Uri.parse("test_data"); DataSpec dataSpec = new DataSpec(testUri, /* position= */ 10, /* length= */ 20); CachingCounters counters = new CachingCounters(); - CacheUtil.cache(cache, dataSpec, dataSource, counters, /* isCanceled= */ null); + + CacheWriter cacheWriter = + new CacheWriter( + new CacheDataSource(cache, dataSource), + dataSpec, + /* allowShortContent= */ false, + /* isCanceled= */ null, + /* temporaryBuffer= */ null, + counters); + cacheWriter.cache(); counters.assertValues(0, 20, 20); counters.reset(); - CacheUtil.cache(cache, new DataSpec(testUri), dataSource, counters, /* isCanceled= */ null); + cacheWriter = + new CacheWriter( + new CacheDataSource(cache, dataSource), + new DataSpec(testUri), + /* allowShortContent= */ false, + /* isCanceled= */ null, + /* temporaryBuffer= */ null, + counters); + cacheWriter.cache(); counters.assertValues(20, 80, 100); assertCachedData(cache, fakeDataSet); @@ -179,9 +231,18 @@ public void cacheLengthExceedsActualDataLength() throws Exception { Uri testUri = Uri.parse("test_data"); DataSpec dataSpec = new DataSpec(testUri, /* position= */ 0, /* length= */ 1000); CachingCounters counters = new CachingCounters(); - CacheUtil.cache(cache, dataSpec, dataSource, counters, /* isCanceled= */ null); - counters.assertValues(0, 100, 1000); + CacheWriter cacheWriter = + new CacheWriter( + new CacheDataSource(cache, dataSource), + dataSpec, + /* allowShortContent= */ true, + /* isCanceled= */ null, + /* temporaryBuffer= */ null, + counters); + cacheWriter.cache(); + + counters.assertValues(0, 100, 100); assertCachedData(cache, fakeDataSet); } @@ -194,16 +255,18 @@ public void cacheThrowEOFException() throws Exception { DataSpec dataSpec = new DataSpec(testUri, /* position= */ 0, /* length= */ 1000); try { - CacheUtil.cache( - new CacheDataSource(cache, dataSource), - dataSpec, - /* progressListener= */ null, - /* isCanceled= */ null, - /* enableEOFException= */ true, - /* temporaryBuffer= */ new byte[CacheUtil.DEFAULT_BUFFER_SIZE_BYTES]); + CacheWriter cacheWriter = + new CacheWriter( + new CacheDataSource(cache, dataSource), + dataSpec, + /* allowShortContent= */ false, + /* isCanceled= */ null, + /* temporaryBuffer= */ null, + /* progressListener= */ null); + cacheWriter.cache(); fail(); - } catch (EOFException e) { - // Do nothing. + } catch (IOException e) { + assertThat(DataSourceException.isCausedByPositionOutOfRange(e)).isTrue(); } } @@ -221,14 +284,21 @@ public void cachePolling() throws Exception { .endData(); FakeDataSource dataSource = new FakeDataSource(fakeDataSet); - CacheUtil.cache( - cache, new DataSpec(Uri.parse("test_data")), dataSource, counters, /* isCanceled= */ null); + CacheWriter cacheWriter = + new CacheWriter( + new CacheDataSource(cache, dataSource), + new DataSpec(Uri.parse("test_data")), + /* allowShortContent= */ false, + /* isCanceled= */ null, + /* temporaryBuffer= */ null, + counters); + cacheWriter.cache(); counters.assertValues(0, 300, 300); assertCachedData(cache, fakeDataSet); } - private static final class CachingCounters implements CacheUtil.ProgressListener { + private static final class CachingCounters implements CacheWriter.ProgressListener { private long contentLength = C.LENGTH_UNSET; private long bytesAlreadyCached;