Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Multi-threaded anime downloading #1248

Merged
merged 23 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
339b519
multi-thread download, I don't understand how translation resource wo…
Dec 31, 2023
cfc98ad
Update DownloadPreferences.kt
giorgionegro Dec 31, 2023
91e427d
multi-thread download, I don't understand how translation resource wo…
Dec 31, 2023
93f5640
Merge remote-tracking branch 'origin/master'
Dec 31, 2023
7c59e38
improved reliability in size decision, improved merge speed with bina…
Dec 31, 2023
75bca4d
improved reliability in size decision, improved merge speed with bina…
Dec 31, 2023
d1a8d76
Merge remote-tracking branch 'origin/master'
Jan 2, 2024
7a4bc1a
Create blank.yml
giorgionegro Jan 2, 2024
abf37d4
Delete .github/workflows/blank.yml
giorgionegro Jan 2, 2024
baca8e8
Removed unnecessary semicolons, hopefully now it will work
Jan 2, 2024
3ef1095
Merge remote-tracking branch 'origin/master'
Jan 2, 2024
4394266
ktlint format
Jan 2, 2024
5b90b0d
made merging more streamlined
Jan 9, 2024
aab9738
Merge branch 'masterbase'
giorgionegro Jan 12, 2024
88e058e
added some comments and reordered some code
giorgionegro Jan 12, 2024
e144552
fixed missing immutable map in settings screen
giorgionegro Jan 12, 2024
280c345
ktlint format
giorgionegro Jan 12, 2024
58570a8
Merge branch 'aniyomiorg:master' into master
giorgionegro Jan 14, 2024
6d78052
Merge remote-tracking branch 'origin/merge_upstream'
giorgionegro Jan 14, 2024
9e9a205
adapted multi threaded download to improve large files reliability
giorgionegro Jan 15, 2024
6ba3fa1
Merge branch 'aniyomiorg:master' into master
giorgionegro Jan 15, 2024
f83eeb7
fix: Reword some strings
jmir1 Jan 15, 2024
865ce78
fix: Revert whitespace changes
jmir1 Jan 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,24 @@ object SettingsDownloadScreen : SearchableSettings {
val allAnimeCategories by getAnimeCategories.subscribe().collectAsState(
initial = runBlocking { getAnimeCategories.await() },
)

val downloadPreferences = remember { Injekt.get<DownloadPreferences>() }
val basePreferences = remember { Injekt.get<BasePreferences>() }

return listOf(
Preference.PreferenceItem.SwitchPreference(
pref = downloadPreferences.downloadOnlyOverWifi(),
title = stringResource(MR.strings.connected_to_wifi),
),
Preference.PreferenceItem.SwitchPreference(
pref = downloadPreferences.multithreadingDownload(),
title = stringResource(MR.strings.multi_thread_download),
subtitle = stringResource(MR.strings.multi_thread_download_summary),
),
Preference.PreferenceItem.ListPreference(
pref = downloadPreferences.numberOfThreads(),
title = stringResource(MR.strings.multi_thread_download_threads_number),
subtitle = stringResource(MR.strings.multi_thread_download_threads_number_summary),
entries = (1..64).associateWith { it.toString() }.toImmutableMap(),
),
Preference.PreferenceItem.SwitchPreference(
pref = downloadPreferences.saveChaptersAsCBZ(),
title = stringResource(MR.strings.save_chapter_as_cbz),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import eu.kanade.tachiyomi.animesource.online.AnimeHttpSource
import eu.kanade.tachiyomi.data.download.anime.model.AnimeDownload
import eu.kanade.tachiyomi.data.library.anime.AnimeLibraryUpdateNotifier
import eu.kanade.tachiyomi.data.notification.NotificationHandler
import eu.kanade.tachiyomi.network.ProgressListener
import eu.kanade.tachiyomi.util.storage.DiskUtil
import eu.kanade.tachiyomi.util.storage.toFFmpegString
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -58,6 +59,7 @@ import uy.kohesive.injekt.api.get
import uy.kohesive.injekt.injectLazy
import java.util.Locale
import kotlin.coroutines.cancellation.CancellationException
import kotlin.math.min

/**
* This class is the one in charge of downloading episodes.
Expand Down Expand Up @@ -494,12 +496,15 @@ class AnimeDownloader(
video.status = Video.State.DOWNLOAD_IMAGE
video.progress = 0
var tries = 0
var forceSequential = false

// Define a suspend function to encapsulate the retry logic
suspend fun attemptDownload(): UniFile {
return try {
newDownload(video, download, tmpDir, filename)
newDownload(video, download, tmpDir, filename, forceSequential)
} catch (e: Exception) {
// If the download failed, try again in sequential mode
forceSequential = true
if (tries >= 2) throw e
tries++
delay((2 shl (tries - 1)) * 1000L)
Expand Down Expand Up @@ -596,11 +601,7 @@ class AnimeDownloader(
}
}

private fun getFFmpegOptions(
video: Video,
headerOptions: String,
ffmpegFilename: String,
): Array<String> {
private fun getFFmpegOptions(video: Video, headerOptions: String, ffmpegFilename: String): Array<String> {
val subtitleInputs = video.subtitleTracks.joinToString(" ", postfix = " ") {
"-i \"${it.url}\""
}
Expand Down Expand Up @@ -646,51 +647,211 @@ class AnimeDownloader(
return hours * 3600000L + minutes * 60000L + fullSeconds * 1000L + hundredths * 10L
}

private suspend fun multiPartDownload(
video: Video,
download: AnimeDownload,
tmpDir: UniFile,
filename: String,
): UniFile {
// first we fetch the size of the video
val size: Long = download.source.getVideoSize(video, 3)
if (size == -1L) {
throw Exception("Could not get video size")
}
var nParts = preferences.numberOfThreads().get()
var partSize = size.div(nParts)
if (partSize < 1024) {
nParts = size.div(1024).toInt()
partSize = size.div(nParts)
}
if (partSize < 1024) {
logcat(LogPriority.WARN) {
"Part size is too small, falling back to sequential download"
}
return newDownload(video, download, tmpDir, filename, true)
}
val partList = mutableListOf<UniFile>()
val rangeList = mutableListOf<Array<Long>>()
val partJobList = mutableListOf<Job>()

// clear the tmp dir, when pause/resume is implemented, this should be changed, we will need the old files to resume
tmpDir.listFiles()?.forEach { it.delete() }
// create the parts as start byte and end byte, when p/r we will need to check if the file exists and resume from there using existing file size
// a better way would be to create chunk of predefined size and use X workers to download them, but this is easier to implement
// if we will use the second method pause/resume will be easier to implement since we can discard the partially downloaded chunks without losing too much progress
for (i in 0 until nParts) {
val start = i * partSize
val end =
if (i == nParts - 1) {
size
} else {
(i + 1) * partSize - 1
}
rangeList.add(arrayOf(start, end))
val part = tmpDir.createFile("$filename.part$i.tmp")!!
partList.add(part)
}

var failed = false
val totalProgresses = mutableListOf<Int>()
totalProgresses.addAll(List(nParts) { 0 })

for (range in rangeList) {
val splitWeight = (range[1] - range[0]).toFloat() / size.toFloat()
// create a listener for each part, so we can update the progress generally
val listener =
object : ProgressListener {
override fun update(
bytesRead: Long,
contentLength: Long,
done: Boolean,
) {
val progress = (((bytesRead * 90 / (range[1] - range[0])) * splitWeight).toInt()) + 1
totalProgresses[rangeList.indexOf(range)] = progress
video.progress = min(totalProgresses.reduce(Int::plus), 90)
}
}

partJobList.add(
scope.launchIO {
try {
if (failed) throw Exception("Download failed")
// I don't know if this is "pausable", I suspect that will only write the bytes to the file when the download of the segment is complete
val response = download.source.getVideoChunk(video, range[0], range[1], listener)
val file =
tmpDir.findFile("$filename.part${rangeList.indexOf(range)}.tmp")
?: tmpDir.createFile("$filename.part${rangeList.indexOf(range)}.tmp")!!
// try to open the file and append the bytes
try {
response.body.source().use { source ->
file.openOutputStream(true).use { output ->
val sink = output.sink().buffer()
val buffer = ByteArray(4 * 1024)
var totalBytesRead = 0L
var bytesRead: Int
while (source.read(buffer).also { bytesRead = it }.toLong() != -1L) {
// Check if the download is paused, if so, wait
while (isPaused) {
delay(1000) // Wait for 1 second before checking again
}
// Write the bytes to the file
sink.write(buffer, 0, bytesRead)
sink.emitCompleteSegments()
totalBytesRead += bytesRead
}
}
}
} catch (e: Exception) {
response.close()
failed = true
throw e
}
} catch (e: Exception) {
failed = true
throw e
}
},
)
}

for (job in partJobList) {
job.join()
}
val partFiles =
(0 until nParts).toMutableList().map { i ->
tmpDir.findFile("$filename.part$i.tmp")
?: tmpDir.createFile("$filename.part$i.tmp")!!
}.toMutableList()
val mergeSize = 10 / (nParts - 1).toFloat()
try {
val finalPartFile = partFiles.first()
finalPartFile.openOutputStream().use { output ->
for (i in 1 until nParts) {
val partFile = partFiles[i]
partFile.openInputStream().use { input ->
output.write(input.readBytes())
}
video.progress = 90 + (mergeSize * (i)).toInt()
}
}
finalPartFile.renameTo("$filename.mp4")
} catch (e: Exception) {
logcat(LogPriority.ERROR) { e.message ?: "Unknown error" }
failed = true
}

for (i in 0 until nParts) {
val part =
tmpDir.findFile("$filename.part$i.tmp")
?: continue
part.delete()
}

if (failed) {
for (i in 0 until nParts) {
val part =
tmpDir.findFile("$filename.part$i.tmp")
?: tmpDir.createFile("$filename.part$i.tmp")!!
part.delete()
}
throw Exception("Download failed")
}
return tmpDir.findFile("$filename.mp4") ?: throw Exception("Download failed")
}

private suspend fun newDownload(
video: Video,
download: AnimeDownload,
tmpDir: UniFile,
filename: String,
forceSequential: Boolean,
): UniFile {
// Check if the download is paused before starting
while (isPaused) {
delay(1000) // This is a pause check delay, adjust the timing as needed.
}

if (isHls(video) || isMpd(video)) {
return ffmpegDownload(video, download, tmpDir, filename)
} else {
val response = download.source.getVideo(video)
val file = tmpDir.findFile("$filename.tmp") ?: tmpDir.createFile("$filename.tmp")!!
when {
isHls(video) || isMpd(video) -> {
return ffmpegDownload(video, download, tmpDir, filename)
}

// Write to file with pause/resume capability
try {
response.body.source().use { source ->
file.openOutputStream(true).use { output ->
val sink = output.sink().buffer()
val buffer = ByteArray(4 * 1024)
var totalBytesRead = 0L
var bytesRead: Int
while (source.read(buffer).also { bytesRead = it }.toLong() != -1L) {
// Check if the download is paused, if so, wait
while (isPaused) {
delay(1000) // Wait for 1 second before checking again
preferences.multithreadingDownload().get() && !forceSequential -> {
return multiPartDownload(video, download, tmpDir, filename)
}

else -> {
val response = download.source.getVideo(video)
val file = tmpDir.findFile("$filename.tmp") ?: tmpDir.createFile("$filename.tmp")!!
// Write to file with pause/resume capability
try {
response.body.source().use { source ->
file.openOutputStream(true).use { output ->
val sink = output.sink().buffer()
val buffer = ByteArray(4 * 1024)
var totalBytesRead = 0L
var bytesRead: Int
while (source.read(buffer).also { bytesRead = it }.toLong() != -1L) {
// Check if the download is paused, if so, wait
while (isPaused) {
delay(1000) // Wait for 1 second before checking again
}
// Write the bytes to the file
sink.write(buffer, 0, bytesRead)
sink.emitCompleteSegments()
totalBytesRead += bytesRead
// Update progress here if needed
}
// Write the bytes to the file
sink.write(buffer, 0, bytesRead)
sink.emitCompleteSegments()
totalBytesRead += bytesRead
// Update progress here if needed
}
}
// After download is complete, rename the file to its final name
file.renameTo("$filename.mp4")
return file
} catch (e: Exception) {
response.close()
if (!queueState.value.equals(download)) file.delete()
throw e
}
// After download is complete, rename the file to its final name
file.renameTo("$filename.mp4")
return file
} catch (e: Exception) {
response.close()
if (!queueState.value.equals(download)) file.delete()
throw e
}
}
}
Expand Down Expand Up @@ -835,10 +996,7 @@ class AnimeDownloader(
}
}

private fun setProgressSubject(
video: Video?,
subject: PublishSubject<Video.State>?,
) {
private fun setProgressSubject(video: Video?, subject: PublishSubject<Video.State>?) {
video?.progressSubject = subject
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,7 @@ class DownloadPreferences(
)

fun numberOfDownloads() = preferenceStore.getInt("download_slots", 1)
fun multithreadingDownload() = preferenceStore.getBoolean("multi_part_download", false)

fun numberOfThreads() = preferenceStore.getInt("download_threads", 4)
}
9 changes: 7 additions & 2 deletions i18n/src/commonMain/resources/MR/base/strings.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@
<string name="onboarding_guides_returning_user">Reinstalling %s?</string>

<!-- Preferences -->
<!-- Subsections -->
<!-- Subsections -->
<string name="pref_category_general">General</string>
<string name="pref_category_appearance">Appearance</string>
<string name="pref_category_library">Library</string>
Expand Down Expand Up @@ -249,6 +249,11 @@
<string name="update_weekly">Weekly</string>
<string name="pref_library_update_restriction">Automatic updates device restrictions</string>
<string name="connected_to_wifi">Only on Wi-Fi</string>
<string name="multi_thread_download">Multi-threaded downloading for anime</string>
<string name="multi_thread_download_summary">Enable downloading anime using multiple threads</string>
<!-- multi_thread_download_threads_number-->
<string name="multi_thread_download_threads_number">Thread count</string>
<string name="multi_thread_download_threads_number_summary">Number of threads to use for downloading, might get your IP blocked if too high, usually 4 is a good number to avoid heavy load on source servers.</string>
<string name="network_not_metered">Only on unmetered network</string>
<string name="charging">When charging</string>
<string name="restrictions">Restrictions: %s</string>
Expand Down Expand Up @@ -793,4 +798,4 @@
<string name="exception_http">HTTP %d, check website in WebView</string>
<string name="exception_offline">No Internet connection</string>
<string name="exception_unknown_host">Couldn\'t reach %s</string>
</resources>
</resources>
Loading