Skip to content

Commit

Permalink
Catch even more coursier concurrency glitches, some consolidations (#…
Browse files Browse the repository at this point in the history
…2112)

This is an extension to our workarounds to coursier concurrency issues.

We handle some issues, already detected by courier and reported as a
result, as well as one which is not detected by coursier, so we
try-catch it and try to match based on the exception type and the
message.

As it is hard to reproduce such kind or issues, I created a GitHub
Actions workflow, which consistently failed without this very PR in step
"1. Run Mill with settings that should stress coursier". After I applied
this PR, all steps passed. That's not a 100-percent guarantee, but it
means, we're probably right on track.

```yaml
name: Stress test coursier downloads to reproduce concurrency issues

on:
  push:
  pull_request:

jobs:

  test-coursier:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v3
        with:
          fetch-depth: 0

      - uses: actions/setup-java@v3
        with:
          java-version: 11
          distribution: temurin

      - name: "Install local Mill into target and ivy cache"
        run: ./mill -i installLocal

      - name: "1. Cleanup courier cache"
        run: rm -r $HOME/.cache/coursier

      - name: "1. Run Mill with settings that should stress coursier"
        run: target/mill-release -i -j 20 -d __.prepareOffline

      - name: "2. Cleanup courier cache"
        run: rm -r $HOME/.cache/coursier

      - name: "2. Run Mill with settings that should stress coursier"
        run: target/mill-release -i -j 20 -d __.prepareOffline

```

Pull request: #2112
  • Loading branch information
lefou authored Nov 9, 2022
1 parent 2679031 commit d1368a3
Showing 1 changed file with 64 additions and 37 deletions.
101 changes: 64 additions & 37 deletions main/src/mill/modules/Jvm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.io.{
}
import java.lang.reflect.Modifier
import java.net.URI
import java.nio.file.{FileSystems, Files, StandardOpenOption}
import java.nio.file.{FileSystems, Files, NoSuchFileException, StandardOpenOption}
import java.nio.file.attribute.PosixFilePermission
import java.util.jar.{Attributes, JarEntry, JarFile, JarOutputStream, Manifest}
import coursier.{Dependency, Repository, Resolution}
Expand All @@ -27,7 +27,7 @@ import mill.modules.Assembly.{AppendEntry, WriteOnceEntry}
import scala.collection.mutable
import scala.util.Properties.isWin
import scala.jdk.CollectionConverters._
import scala.util.Using
import scala.util.{Failure, Success, Try, Using}
import mill.BuildInfo
import os.SubProcess
import upickle.default.{ReadWriter => RW}
Expand Down Expand Up @@ -531,6 +531,54 @@ object Jvm {
PathRef(outputPath)
}

/**
* Somewhat generic way to retry some action and a Workaround for https://github.com/com-lihaoyi/mill/issues/1028
*
* Specifically build for coursier API interactions, which is known to have some concurrency issues which we handle on a known case basis.
*
* @param retryCount The max retry count
* @param ctx The context to use ot show log messages (if defined)
* @param errorMsgExtractor A generic way to get the error message of a run of `f`
* @param f The actual operation to retry, if it results in a known concurrency error
* @tparam T The result type of the computation
* @return The result of the computation. If the computation was retries and finally succeeded, proviously occured errors will not be included in the result.
*/
@tailrec
private def retry[T](
retryCount: Int = CoursierRetryCount,
ctx: Option[Ctx.Log],
errorMsgExtractor: T => Seq[String]
)(f: () => T): T = {
val tried = Try(f())
tried match {
case Failure(e: NoSuchFileException)
if retryCount > 0 && e.getMessage.contains("__sha1.computed") =>
// this one is not detected by coursier itself, so we try-catch handle it
// I assume, this happens when another coursier thread already moved or rename dthe temporary file
ctx.foreach(_.log.debug(
s"Detected a concurrent download issue in coursier. Attempting a retry (${retryCount} left)"
))
Thread.sleep(CoursierRetryWait)
retry(retryCount - 1, ctx, errorMsgExtractor)(f)
case Success(res) if retryCount > 0 =>
val errors = errorMsgExtractor(res)
if (errors.exists(e => e.contains("concurrent download"))) {
ctx.foreach(_.log.debug(
s"Detected a concurrent download issue in coursier. Attempting a retry (${retryCount} left)"
))
Thread.sleep(CoursierRetryWait)
retry(retryCount - 1, ctx, errorMsgExtractor)(f)
} else if (errors.exists(e => e.contains("checksum not found"))) {
ctx.foreach(_.log.debug(
s"Detected a checksum download issue in coursier. Attempting a retry (${retryCount} left)"
))
Thread.sleep(CoursierRetryWait)
retry(retryCount - 1, ctx, errorMsgExtractor)(f)
} else res
case r => r.get
}
}

/**
* Resolve dependencies using Coursier.
*
Expand Down Expand Up @@ -581,10 +629,7 @@ object Jvm {
identity[coursier.cache.FileCache[Task]](_)
).apply(coursierCache0)

@tailrec def load(
artifacts: Seq[coursier.util.Artifact],
retry: Int = CoursierRetryCount
): (Seq[ArtifactError], Seq[File]) = {
def load(artifacts: Seq[coursier.util.Artifact]): (Seq[ArtifactError], Seq[File]) = {
import scala.concurrent.ExecutionContext.Implicits.global
val loadedArtifacts = Gather[Task].gather(
for (a <- artifacts)
Expand All @@ -597,20 +642,7 @@ object Jvm {
}
val successes = loadedArtifacts.collect { case (_, Right(x)) => x }

if (retry > 0 && errors.exists(e => e.describe.contains("concurrent download"))) {
ctx.foreach(_.log.debug(
s"Detected a concurrent download issue in coursier. Attempting a retry (${retry} left)"
))
Thread.sleep(CoursierRetryWait)
load(artifacts, retry - 1)
} else if (retry > 0 && errors.exists(e => e.describe.contains("checksum not found"))) {
ctx.foreach(_.log.debug(
s"Detected a checksum download issue in coursier. Attempting a retry (${retry} left)"
))
Thread.sleep(CoursierRetryWait)
load(artifacts, retry - 1)

} else (errors, successes)
(errors, successes)
}

val sourceOrJar =
Expand All @@ -629,7 +661,14 @@ object Jvm {
coursier.Type("maven-plugin")
)
)
val (errors, successes) = load(sourceOrJar)

val (errors, successes) = retry(
ctx = ctx,
errorMsgExtractor = (res: (Seq[ArtifactError], Seq[File])) => res._1.map(_.describe)
) {
() => load(sourceOrJar)
}

if (errors.isEmpty) {
mill.Agg.from(
successes.map(p => PathRef(os.Path(p), quick = true)).filter(_.path.ext == "jar")
Expand Down Expand Up @@ -689,23 +728,11 @@ object Jvm {

import scala.concurrent.ExecutionContext.Implicits.global

// Workaround for https://github.com/com-lihaoyi/mill/issues/1028
@tailrec def retriedResolution(count: Int = CoursierRetryCount): Resolution = {
val resolution = start.process.run(fetch).unsafeRun()
if (
count > 0 &&
resolution.errors.nonEmpty &&
resolution.errors.exists(_._2.exists(_.contains("concurrent download")))
) {
ctx.foreach(_.log.debug(
s"Detected a concurrent download issue in coursier. Attempting a retry (${count} left)"
))
Thread.sleep(CoursierRetryWait)
retriedResolution(count - 1)
} else resolution
}
val resolution =
retry(ctx = ctx, errorMsgExtractor = (r: Resolution) => r.errors.flatMap(_._2)) {
() => start.process.run(fetch).unsafeRun()
}

val resolution = retriedResolution()
(deps.iterator.to(Seq), resolution)
}

Expand Down

0 comments on commit d1368a3

Please sign in to comment.