Skip to content

Commit

Permalink
Implemented AutoCloseable Workers (com-lihaoyi#1781)
Browse files Browse the repository at this point in the history
The idea is, to automatically call the `java.lang.AutoCloseable.close()` method on workers than gets dropped, to provide a chance to clean up and free resources.

I also applied some minor improvements on the go. E.g. synchronized access to `Evaluator.workerCache` now synchronizes on the `workerCache`, not on the `Evaluator` instance.

I also added test cases to test `AutoCloseable` workers, sequentially and parallel.

Updated documentation.

This PR implements this idea:

*  com-lihaoyi#1780

Pull request: com-lihaoyi#1781
  • Loading branch information
lefou authored Mar 16, 2022
1 parent 7abaaef commit 4103b16
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 88 deletions.
21 changes: 21 additions & 0 deletions docs/antora/modules/ROOT/pages/Tasks.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,27 @@ mutable state, and it is up to the implementation to ensure that this mutable
state is only used for caching/performance and does not affect the
externally-visible behavior of the worker.

=== `Autoclosable` Workers

As <<Workers>> may also hold limited resources, it may be necessary to free up these resources once a worker is no longer needed.
This is especially the case, when your worker tasks depends on other tasks and these tasks change, as Mill will then also create a new worker instance.

To implement resource cleanup, your worker can implement `java.lang.AutoCloseable`.
Once the worker is no longer needed, Mill will call the `close()` method on it before any newer version of this worker is created.

[source,scala]
----
import mill._
import java.lang.AutoCloseable
def myWorker = T.worker {
new MyWorker with AutoCloseable {
// ...
override def close() = { /* cleanup and free resources */ }
}
}
----

== Task Cheat Sheet

The following table might help you make sense of the small collection of
Expand Down
53 changes: 41 additions & 12 deletions main/core/src/mill/eval/Evaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ case class Labelled[T](task: NamedTask[T], segments: Segments) {
/**
* Evaluate tasks.
*/
class Evaluator private (
class Evaluator private[Evaluator] (
_home: os.Path,
_outPath: os.Path,
_externalOutPath: os.Path,
Expand Down Expand Up @@ -170,7 +170,6 @@ class Evaluator private (
testReporter: TestReporter = DummyTestReporter
): Evaluator.Results = {
val (sortedGroups, transitive) = Evaluator.plan(goals)

val evaluated = new Agg.Mutable[Task[_]]
val results = mutable.LinkedHashMap.empty[Task[_], mill.api.Result[(Any, Int)]]
var someTaskFailed: Boolean = false
Expand All @@ -190,9 +189,10 @@ class Evaluator private (
)

val startTime = System.currentTimeMillis()
// Increment the counter message by 1 to go from 1/10 to 10/10 instead of 0/10 to 9/10

// Increment the counter message by 1 to go from 1/10 to 10/10 instead of 0/10 to 9/10
val counterMsg = (i + 1) + "/" + sortedGroups.keyCount

val Evaluated(newResults, newEvaluated, cached) = evaluateGroupCached(
terminal = terminal,
group = group,
Expand Down Expand Up @@ -386,7 +386,7 @@ class Evaluator private (
externalClassLoaderSigHash + scriptsHash
} else {
// We fallback to the old mechanism when the importTree was not populated
classLoaderSignHash
classLoaderSig.hashCode()
}

val inputsHash = externalInputsHash + sideHashes + classLoaderSigHash
Expand All @@ -405,6 +405,7 @@ class Evaluator private (
logger
)
Evaluated(newResults, newEvaluated.toSeq, false)

case lntRight @ Right(labelledNamedTask) =>
val out =
if (!labelledNamedTask.task.ctx.external) outPath
Expand All @@ -426,11 +427,35 @@ class Evaluator private (
catch { case e: Throwable => None }
} yield (parsed, cached.valueHash)

val workerCached: Option[Any] = labelledNamedTask.task.asWorker
.flatMap { w => synchronized { workerCache.get(w.ctx.segments) } }
.collect { case (`inputsHash`, v) => v }
val previousWorker = labelledNamedTask.task.asWorker.flatMap { w =>
workerCache.synchronized { workerCache.get(w.ctx.segments) }
}
val upToDateWorker: Option[Any] = previousWorker.flatMap {
case (`inputsHash`, upToDate) =>
// worker cached and up-to-date
Some(upToDate)
case (_, obsolete: AutoCloseable) =>
// worker cached but obsolete, needs to be closed
try {
logger.debug(s"Closing previous worker: ${labelledNamedTask.segments.render}")
obsolete.close()
} catch {
case NonFatal(e) =>
logger.error(
s"${labelledNamedTask.segments.render}: Errors while closing obsolete worker: ${e.getMessage()}"
)
}
// make sure, we can no longer re-use a closed worker
labelledNamedTask.task.asWorker.foreach { w =>
workerCache.synchronized { workerCache.remove(w.ctx.segments) }
}
None
case _ =>
// worker not cached or obsolete
None
}

workerCached.map((_, inputsHash)) orElse cached match {
upToDateWorker.map((_, inputsHash)) orElse cached match {
case Some((v, hashCode)) =>
val newResults = mutable.LinkedHashMap.empty[Task[_], mill.api.Result[(Any, Int)]]
newResults(labelledNamedTask.task) = mill.api.Result.Success((v, hashCode))
Expand Down Expand Up @@ -494,9 +519,12 @@ class Evaluator private (
metaPath: os.Path,
inputsHash: Int,
labelledNamedTask: Labelled[_]
) = {
): Unit = {
labelledNamedTask.task.asWorker match {
case Some(w) => synchronized { workerCache(w.ctx.segments) = (inputsHash, v) }
case Some(w) =>
workerCache.synchronized {
workerCache.update(w.ctx.segments, (inputsHash, v))
}
case None =>
val terminalResult = labelledNamedTask
.writer
Expand Down Expand Up @@ -864,14 +892,15 @@ object Evaluator {
val topoSorted = Graph.topoSorted(transitive)
val seen = collection.mutable.Set.empty[Segments]
val overridden = collection.mutable.Set.empty[Task[_]]
topoSorted.values.reverse.foreach {
topoSorted.values.reverse.iterator.foreach {
case x: NamedTask[_] =>
if (!seen.contains(x.ctx.segments)) seen.add(x.ctx.segments)
else overridden.add(x)
case _ => // donothing
}

val sortedGroups = Graph.groupAroundImportantTargets(topoSorted) {
// important: all named tasks and those explicitly requested
case t: NamedTask[Any] =>
val segments = t.ctx.segments
Right(
Expand Down Expand Up @@ -1036,7 +1065,7 @@ object Evaluator {
Seq.empty
)

@deprecated(message = "Pattern matching not supported with EvaluatorState", since = "mill 0.10.1")
@deprecated(message = "Pattern matching not supported with Evaluator", since = "mill 0.10.1")
def unapply(evaluator: Evaluator): Option[(
os.Path,
os.Path,
Expand Down
Loading

0 comments on commit 4103b16

Please sign in to comment.