Skip to content

Commit

Permalink
Reworked the coursier resolver retry logic (#1772)
Browse files Browse the repository at this point in the history
  • Loading branch information
lefou authored Mar 10, 2022
1 parent 4667ad6 commit 9f923fe
Showing 1 changed file with 69 additions and 47 deletions.
116 changes: 69 additions & 47 deletions main/src/mill/modules/Jvm.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package mill.modules

import coursier.cache.ArtifactError

import java.io.{
ByteArrayInputStream,
File,
Expand Down Expand Up @@ -29,10 +31,13 @@ import scala.util.Using
import mill.BuildInfo
import upickle.default.{ReadWriter => RW}

import java.util.function.Consumer
import scala.annotation.tailrec

object Jvm {

private val ConcurrentRetryCount = 5
private val ConcurrentRetryWait = 100

/**
* Runs a JVM subprocess with the given configuration and returns a
* [[os.CommandResult]] with it's aggregated output and error streams
Expand Down Expand Up @@ -101,8 +106,8 @@ object Jvm {
val passingJar = os.temp(prefix = "run-", suffix = ".jar", deleteOnExit = false)
ctx.log.debug(
s"Creating classpath passing jar '${passingJar}' with Class-Path: ${classPath.iterator.map(
_.toNIO.toUri().toURL().toExternalForm()
).mkString(" ")}"
_.toNIO.toUri().toURL().toExternalForm()
).mkString(" ")}"
)
createClasspathPassingJar(passingJar, classPath)
Agg(passingJar)
Expand Down Expand Up @@ -488,12 +493,12 @@ object Jvm {
universalScript(
shellCommands =
s"""exec java ${jvmArgs.mkString(" ")} $$JAVA_OPTS -cp "${shellClassPath.mkString(
":"
)}" '$mainClass' "$$@"""",
":"
)}" '$mainClass' "$$@"""",
cmdCommands =
s"""java ${jvmArgs.mkString(" ")} %JAVA_OPTS% -cp "${cmdClassPath.mkString(
";"
)}" $mainClass %*""",
";"
)}" $mainClass %*""",
shebang = shebang
)
}
Expand Down Expand Up @@ -535,7 +540,9 @@ object Jvm {
mapDependencies: Option[Dependency => Dependency] = None,
customizer: Option[coursier.core.Resolution => coursier.core.Resolution] = None,
ctx: Option[mill.api.Ctx.Log] = None,
coursierCacheCustomizer: Option[coursier.cache.FileCache[Task] => coursier.cache.FileCache[Task]] = None
coursierCacheCustomizer: Option[
coursier.cache.FileCache[Task] => coursier.cache.FileCache[Task]
] = None
): Result[Agg[PathRef]] = {

val (_, resolution) = resolveDependenciesMetadata(
Expand Down Expand Up @@ -564,10 +571,11 @@ object Jvm {
} else {

val coursierCache0 = coursier.cache.FileCache[Task].noCredentials
val coursierCache = coursierCacheCustomizer.getOrElse(identity[coursier.cache.FileCache[Task]](_)).apply(coursierCache0)

def load(artifacts: Seq[coursier.util.Artifact]) = {
val coursierCache = coursierCacheCustomizer.getOrElse(
identity[coursier.cache.FileCache[Task]](_)
).apply(coursierCache0)

@tailrec def load(artifacts: Seq[coursier.util.Artifact], retry: Int = ConcurrentRetryCount): (Seq[ArtifactError], Seq[File]) = {
import scala.concurrent.ExecutionContext.Implicits.global
val loadedArtifacts = Gather[Task].gather(
for (a <- artifacts)
Expand All @@ -579,7 +587,15 @@ object Jvm {
case (true, Left(x)) if !x.notFound => x
}
val successes = loadedArtifacts.collect { case (_, Right(x)) => x }
(errors, successes)

if(retry > 0 && errors.exists(_.describe.contains("concurrent download"))) {
ctx.foreach(_.log.debug(
s"Detected a concurrent download issue in coursier. Attempting a retry (${retry} left)"
))
Thread.sleep(ConcurrentRetryWait)
load(artifacts, retry - 1)
}
else(errors, successes)
}

val sourceOrJar =
Expand All @@ -604,8 +620,10 @@ object Jvm {
successes.map(p => PathRef(os.Path(p), quick = true)).filter(_.path.ext == "jar")
)
} else {
val errorDetails = errors.map(e => s"${ammonite.util.Util.newLine} ${e.describe}").mkString
Result.Failure("Failed to load source dependencies" + errorDetails)
val errorDetails = errors.map(e => s"${System.lineSeparator()} ${e.describe}").mkString
Result.Failure(
s"Failed to load ${if (sources) "source " else ""}dependencies" + errorDetails
)
}
}
}
Expand All @@ -617,12 +635,14 @@ object Jvm {
mapDependencies: Option[Dependency => Dependency] = None,
customizer: Option[coursier.core.Resolution => coursier.core.Resolution] = None,
ctx: Option[mill.api.Ctx.Log] = None,
coursierCacheCustomizer: Option[coursier.cache.FileCache[Task] => coursier.cache.FileCache[Task]] = None
coursierCacheCustomizer: Option[
coursier.cache.FileCache[Task] => coursier.cache.FileCache[Task]
] = None
): (Seq[Dependency], Resolution) = {

val cachePolicies = coursier.cache.CacheDefaults.cachePolicies

val forceVersions = force
val forceVersions = force.iterator
.map(mapDependencies.getOrElse(identity[Dependency](_)))
.map { d => d.module -> d.version }
.toMap
Expand All @@ -644,31 +664,33 @@ object Jvm {
.withCachePolicies(cachePolicies)
.withLogger(l)
}
val coursierCache = coursierCacheCustomizer.getOrElse(identity[coursier.cache.FileCache[Task]](_)).apply(coursierCache0)
val coursierCache = coursierCacheCustomizer.getOrElse(
identity[coursier.cache.FileCache[Task]](_)
).apply(coursierCache0)

val fetches = coursierCache.fetchs

val fetch = coursier.core.ResolutionProcess.fetch(repositories, fetches.head, fetches.tail)

import scala.concurrent.ExecutionContext.Implicits.global

// Workaround for https://github.com/com-lihaoyi/mill/issues/1028
// retry snippet taken from: https://github.com/coursier/coursier/issues/2022#issuecomment-812553102
def fetchWithRetry(m: Int): Task[Resolution] = Task.tailRecM[Int, Resolution](m) { (n: Int) =>
if (n <= 0) start.process.run(fetch).map(Right(_))
else
start.process.run(fetch).attempt map {
case Right(r) => Right(r)
case Left(e) if e.getMessage.contains("concurrent download") =>
ctx.foreach(_.log.debug(
s"Detected a concurrent download issue in coursier, attempting a retry. Recovered exception message: ${e.getMessage}"
))
Thread.sleep(100)
Left(n - 1)
case Left(e) => throw e
}
@tailrec def retriedResolution(count: Int = ConcurrentRetryCount): Resolution = {
val resolution = start.process.run(fetch).unsafeRun()
if (
count > 0 &&
resolution.errors.nonEmpty &&
resolution.errors.exists(_._2.exists(_.contains("concurrent download")))
) {
ctx.foreach(_.log.debug(
s"Detected a concurrent download issue in coursier. Attempting a retry (${count} left)"
))
Thread.sleep(ConcurrentRetryWait)
retriedResolution(count - 1)
} else resolution
}
import scala.concurrent.ExecutionContext.Implicits.global
val resolution = fetchWithRetry(5).unsafeRun()

val resolution = retriedResolution()
(deps.iterator.to(Seq), resolution)
}

Expand Down Expand Up @@ -774,14 +796,14 @@ object Jvm {
"mill after 0.10.0"
)
def resolveDependencies(
repositories: Seq[Repository],
deps: IterableOnce[coursier.Dependency],
force: IterableOnce[coursier.Dependency],
sources: Boolean,
mapDependencies: Option[Dependency => Dependency],
customizer: Option[coursier.core.Resolution => coursier.core.Resolution],
ctx: Option[mill.api.Ctx.Log]
): Result[Agg[PathRef]] =
repositories: Seq[Repository],
deps: IterableOnce[coursier.Dependency],
force: IterableOnce[coursier.Dependency],
sources: Boolean,
mapDependencies: Option[Dependency => Dependency],
customizer: Option[coursier.core.Resolution => coursier.core.Resolution],
ctx: Option[mill.api.Ctx.Log]
): Result[Agg[PathRef]] =
resolveDependencies(
repositories = repositories,
deps = deps,
Expand All @@ -798,12 +820,12 @@ object Jvm {
"mill after 0.10.0"
)
def resolveDependenciesMetadata(
repositories: Seq[Repository],
deps: IterableOnce[coursier.Dependency],
force: IterableOnce[coursier.Dependency],
mapDependencies: Option[Dependency => Dependency],
customizer: Option[coursier.core.Resolution => coursier.core.Resolution],
ctx: Option[mill.api.Ctx.Log]
repositories: Seq[Repository],
deps: IterableOnce[coursier.Dependency],
force: IterableOnce[coursier.Dependency],
mapDependencies: Option[Dependency => Dependency],
customizer: Option[coursier.core.Resolution => coursier.core.Resolution],
ctx: Option[mill.api.Ctx.Log]
): (Seq[Dependency], Resolution) =
resolveDependenciesMetadata(
repositories = repositories,
Expand Down

0 comments on commit 9f923fe

Please sign in to comment.