Skip to content

Commit

Permalink
[WIP] Add ability to watch T.inputs and interp.watchValues (#2489)
Browse files Browse the repository at this point in the history
Fixes #838 and also fixes
#909

We add the ability to re-run `T.input{...}` blocks during the polling
loop, just like we re-stat the `os.Paths` reported by `T.source` and
`T.sources`. This involves some complexity, since `T.input{...}` blocks
are not just a single lambda but instead a wrapper for a part of the
`Task` graph that feeds into it. We capture the body of `evaluateGroup`
in a lambda so we can call it later

# Major Changes

1. Modify `Evaluator.Results#results` from a `Map[Task, Result]` to
`Map[Task, TaskResult]`, where `TaskResult[T]` is a wrapper of
`Result[T]` that also contains a `recalcOpt: Option[() => Result[T]]`
callback

2. Wrap the bulk of `Evaluator#evaluateGroup` in a function, that we
evaluate once to populate `TaskResult#result` and store in a callback to
populate `TaskResult#recalcOpt`

3. Update `RunScript.evaluateNamed` to handle `InputImpl`s, in addition
to the current handling of `SourceImpl`s and `SourcesImpl`s, and
translate the `recalcOpt` callback into a `Watchable.Value`

4. The `Watchable.Value` is then propagated to
`MillBuildBootstrap.evaluateWithWatches`, which ends up feeding it into
`Watching.scala` where it is processed the same way as the
`Watchable.Value`s we get from `interp.watchValue`

5. Overhauled the implementation of `show`; rather than returning a
`Watched` wrapper, we now call back to `interp.evalWatch0` to register
the things we need to watch. I also consolidated the common code in
`show` and `showNamed` into one shared helper.

6. I wrapped most of the `Any`s or `_`s in `Evaluator` with a new `case
class Val(value: Any)` wrapper. This should considerably increase type
safety: e.g. you can no longer pass an `Option[Val]` where a `Val` was
expected, unless you manually wrap it in `Val(...)` which is hard to do
accidentally. This makes it much easier to ensure the various data
structures inside `Evaluator` are passed around correctly and not
mis-used

# Minor Changes

1. Cleaned up `RunScripts` considerably, now there's only two methods
left and no copy-pasty code

2. Removed some `T.input`s from `MillBuildRootModule`: `def
unmanagedClasspath` can be a `T.sources`, and `def parseBuildFiles` is
now a normal target that depends on `allBuildFiles` which is a
`T.sources`. That should improve the `--watch` status message in the
terminal and reduce the amount of work that needs to be done every 100ms
while polling for changes

# Testing 

Tested manually with the following `build.sc`; verified with `-w bar`
that `bar` re-runs every 2s, and the script re-evaluates every 10s

```scala
import mill._

interp.watchValue(System.currentTimeMillis() / 10000)

println("Setting up build.sc")
def foo = T.input{ System.currentTimeMillis() / 2000 }

def bar = T{ foo() + " seconds since the epoch" }
```

Also added `integration/feature/watch-source-input/` suite that starts
up a `--watch` in a background thread, makes changes to the files on
disk, and ensures the `build.sc` and tasks within it re-evaluate as
expected.

# Notes

1. I'm not sure if the new way `show` registers watches is better than
the old one. The old way with `Watched` can be made to work even in
failure cases if we pass it as the optional second parameter to
`Result.Failure`

2. The `Val` change is not strictly necessary, and is pretty invasive.
But it was very hard to make the necessary changes to `Evaluator`
without it due to bugs passing the wrong `Any`s around. So I think it's
a reasonable time to introduce `Val` to support future work inside
`Evaluator` with increased compiler support and type safety

3. The `recalc` logic would be a lot simpler if `input`s couldn't have
upstream tasks, since we could just re-run the block rather than
re-evaluating the task group. I had some thoughts about restricting
`T.input`s to allow this, but couldn't convince myself that was the
right thing to do, so left the `T.input` semantics in place and just
made it work by capturing a callback that can evaluate the entire task
group
  • Loading branch information
lihaoyi authored May 4, 2023
1 parent 14e1615 commit bde3d59
Show file tree
Hide file tree
Showing 32 changed files with 595 additions and 357 deletions.
2 changes: 1 addition & 1 deletion bsp/worker/src/mill/bsp/worker/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object Utils {
results: Evaluator.Results,
task: mill.define.Task[_]
): StatusCode = {
results.results(task) match {
results.results(task).result match {
case Success(_) => StatusCode.OK
case Skipped => StatusCode.CANCELLED
case _ => StatusCode.ERROR
Expand Down
19 changes: 17 additions & 2 deletions ci/mill-bootstrap.patch
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/build.sc b/build.sc
index 0f664c99c3..7d8788e7a8 100644
index 0f664c99c3..d738fbe351 100644
--- a/build.sc
+++ b/build.sc
@@ -19,17 +19,18 @@ import com.github.lolgab.mill.mima.{
Expand Down Expand Up @@ -518,7 +518,7 @@ index 0f664c99c3..7d8788e7a8 100644
os.copy(examplePath, T.dest / exampleStr, createFolders = true)
os.copy(bootstrapLauncher().path, T.dest / exampleStr / "mill")
val zip = T.dest / s"$exampleStr.zip"
@@ -2018,47 +1735,6 @@ def exampleZips: Target[Seq[PathRef]] = T {
@@ -2018,51 +1735,10 @@ def exampleZips: Target[Seq[PathRef]] = T {
}

def uploadToGithub(authKey: String) = T.command {
Expand Down Expand Up @@ -566,3 +566,18 @@ index 0f664c99c3..7d8788e7a8 100644
}

def validate(ev: Evaluator): Command[Unit] = T.command {
- T.task(MainModule.evaluateTasks(
+ mill.main.RunScript.evaluateTasksNamed(
ev.withFailFast(false),
Seq(
"__.compile",
@@ -2075,7 +1751,8 @@ def validate(ev: Evaluator): Command[Unit] = T.command {
"docs.localPages"
),
selectMode = SelectMode.Separated
- )(identity))()
+ )
+
()
}

2 changes: 1 addition & 1 deletion example/src/mill/integration/ExampleTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object ExampleTestSuite extends IntegrationTestSuite {
BashTokenizer.tokenize(commandStr) match {
case Seq(s"./$command", rest @ _*) =>
val evalResult = command match {
case "mill" => evalStdout(rest: _*)
case "mill" => evalStdout(rest)
case cmd =>
val tokens = cmd +: rest
val executable = workspaceRoot / os.RelPath(tokens.head)
Expand Down
6 changes: 3 additions & 3 deletions example/tasks/2-primary-tasks/build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ def lineCount: T[Int] = T {
> ./mill show lineCount
Computing line count
18
16
> ./mill show lineCount # line count already cached, doesn't need to be computed
18
16
*/

Expand Down Expand Up @@ -111,7 +111,7 @@ def hugeFileName = T{
/** Usage
> ./mill show lineCount
18
16
> ./mill show hugeFileName # This still runs `largestFile` even though `lineCount() < 999`
Finding Largest File
Expand Down
2 changes: 0 additions & 2 deletions example/tasks/2-primary-tasks/src/Foo.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,4 @@ public static void main(String[] args) throws IOException{
System.out.println("foo.txt resource: " + br.readLine());
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ object MultiLevelBuildTests extends IntegrationTestSuite {
*/
def checkWatchedFiles(expected0: Seq[os.Path]*) = {
for ((expectedWatched0, (frame, path)) <- expected0.zip(loadFrames(expected0.length))) {
val frameWatched = frame.evalWatched.map(_.path).sorted
val frameWatched = frame
.evalWatched
.map(_.path)
.sorted.filter(_.startsWith(wsRoot))
.filter(!_.segments.contains("mill-launcher"))

val expectedWatched = expectedWatched0.sorted
assert(frameWatched == expectedWatched)
}
Expand Down
1 change: 1 addition & 0 deletions integration/feature/watch-source-input/repo/bar.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
initial-bar
1 change: 1 addition & 0 deletions integration/feature/watch-source-input/repo/baz.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
initial-baz
47 changes: 47 additions & 0 deletions integration/feature/watch-source-input/repo/build.sc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import mill._

println("Setting up build.sc")

def foo = T.sources(millSourcePath / "foo1.txt", millSourcePath / "foo2.txt")
def bar = T.source(millSourcePath / "bar.txt")

def qux = T{
val fooMsg = "Running qux foo contents " + foo().map(p => os.read(p.path)).mkString(" ")
println(fooMsg)

val barMsg = "Running qux bar contents " + os.read(bar().path)
println(barMsg)

writeCompletionMarker("quxRan")

fooMsg + " " + barMsg
}

interp.watchValue(PathRef(millSourcePath / "watchValue.txt"))

def baz = T.input(PathRef(millSourcePath / "baz.txt"))

def lol = T{
val barMsg = "Running lol baz contents " + os.read(baz().path)
println(barMsg)

writeCompletionMarker("lolRan")

barMsg
}


def writeCompletionMarker(name: String) = {

Range(0, 10)
.map(i => os.pwd / "out" / s"$name$i")
.find(!os.exists(_))
.foreach(os.write(_, ""))
}

writeCompletionMarker("initialized")

if (os.read(millSourcePath / "watchValue.txt").contains("exit")){
Thread.sleep(1000)
System.exit(0)
}
1 change: 1 addition & 0 deletions integration/feature/watch-source-input/repo/foo1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
initial-foo1
1 change: 1 addition & 0 deletions integration/feature/watch-source-input/repo/foo2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
initial-foo2
1 change: 1 addition & 0 deletions integration/feature/watch-source-input/repo/watchValue.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
initial-watchValue2
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package mill.integration

import utest._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.concurrent.duration.SECONDS
import scala.concurrent.ExecutionContext.Implicits.global

/**
* Test to make sure that `--watch` works in the following cases:
*
* 1. `T.source`
* 2. `T.sources`
* 3. `T.input`
* 4. `interp.watchValue`
* 5. Implicitly watched files, like `build.sc`
*/
object WatchSourceInputTests extends IntegrationTestSuite {

val maxDuration = 30000
val tests = Tests {
val wsRoot = initWorkspace()

def awaitCompletionMarker(name: String) = {
val maxTime = System.currentTimeMillis() + maxDuration
while (!os.exists(wsRoot / "out" / name)) {
if (System.currentTimeMillis() > maxTime) {
sys.error(s"awaitCompletionMarker($name) timed out")
}
Thread.sleep(100)
}
}

def testWatchSource(show: Boolean) = {
val showArgs = if (show) Seq("show") else Nil

val evalResult = Future { evalTimeoutStdout(maxDuration, "--watch", showArgs, "qux") }
val expectedPrints = collection.mutable.Buffer.empty[String]
val expectedShows = collection.mutable.Buffer.empty[String]

awaitCompletionMarker("initialized0")
awaitCompletionMarker("quxRan0")
expectedPrints.append(
"Setting up build.sc",
"Running qux foo contents initial-foo1 initial-foo2",
"Running qux bar contents initial-bar"
)
expectedShows.append(
"Running qux foo contents initial-foo1 initial-foo2 Running qux bar contents initial-bar"
)

os.write.over(wsRoot / "foo1.txt", "edited-foo1")
awaitCompletionMarker("quxRan1")
expectedPrints.append(
"Running qux foo contents edited-foo1 initial-foo2",
"Running qux bar contents initial-bar"
)
expectedShows.append(
"Running qux foo contents edited-foo1 initial-foo2 Running qux bar contents initial-bar"
)

os.write.over(wsRoot / "foo2.txt", "edited-foo2")
awaitCompletionMarker("quxRan2")
expectedPrints.append(
"Running qux foo contents edited-foo1 edited-foo2",
"Running qux bar contents initial-bar"
)
expectedShows.append(
"Running qux foo contents edited-foo1 edited-foo2 Running qux bar contents initial-bar"
)

os.write.over(wsRoot / "bar.txt", "edited-bar")
awaitCompletionMarker("quxRan3")
expectedPrints.append(
"Running qux foo contents edited-foo1 edited-foo2",
"Running qux bar contents edited-bar"
)
expectedShows.append(
"Running qux foo contents edited-foo1 edited-foo2 Running qux bar contents edited-bar"
)

os.write.append(wsRoot / "build.sc", "\ndef unrelated = true")
awaitCompletionMarker("initialized1")
expectedPrints.append(
"Setting up build.sc",
"Running qux foo contents edited-foo1 edited-foo2",
"Running qux bar contents edited-bar"
)
expectedShows.append(
"Running qux foo contents edited-foo1 edited-foo2 Running qux bar contents edited-bar"
)

os.write.over(wsRoot / "watchValue.txt", "exit")
awaitCompletionMarker("initialized2")
expectedPrints.append("Setting up build.sc")

val res = Await.result(evalResult, Duration.apply(maxDuration, SECONDS))

val (shows, prints) = res.out.linesIterator.toVector.partition(_.startsWith("\""))

assert(prints == expectedPrints)
if (show) assert(shows == expectedShows.map('"' + _ + '"'))
}

test("sources") {

test("noshow") - testWatchSource(false)
test("show") - testWatchSource(true)
}

def testWatchInput(show: Boolean) = {
val showArgs = if (show) Seq("show") else Nil

val evalResult = Future { evalTimeoutStdout(maxDuration, "--watch", showArgs, "lol") }
val expectedPrints = collection.mutable.Buffer.empty[String]
val expectedShows = collection.mutable.Buffer.empty[String]

awaitCompletionMarker("initialized0")
awaitCompletionMarker("lolRan0")
expectedPrints.append(
"Setting up build.sc",
"Running lol baz contents initial-baz"
)
expectedShows.append("Running lol baz contents initial-baz")

os.write.over(wsRoot / "baz.txt", "edited-baz")
awaitCompletionMarker("lolRan1")
expectedPrints.append("Running lol baz contents edited-baz")
expectedShows.append("Running lol baz contents edited-baz")

os.write.over(wsRoot / "watchValue.txt", "edited-watchValue")
awaitCompletionMarker("initialized1")
expectedPrints.append("Setting up build.sc")
expectedShows.append("Running lol baz contents edited-baz")

os.write.over(wsRoot / "watchValue.txt", "exit")
awaitCompletionMarker("initialized2")
expectedPrints.append("Setting up build.sc")

val res = Await.result(evalResult, Duration.apply(maxDuration, SECONDS))

val (shows, prints) = res.out.linesIterator.toVector.partition(_.startsWith("\""))
assert(prints == expectedPrints)
if (show) assert(shows == expectedShows.map('"' + _ + '"'))
}

test("input") {

test("noshow") - testWatchInput(false)
test("show") - testWatchInput(true)
}
}
}
27 changes: 13 additions & 14 deletions integration/src/mill/integration/IntegrationTestSuite.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
package mill.integration

import mainargs.Flag
import mill.runner.MillCliConfig
import mill.main.SelectMode
import mill.runner.{MillBuildBootstrap, MillMain, RunnerState, Watching}
import mill.api.SystemStreams
import mill.util.PrintLogger
import os.Path
import mill.runner.RunnerState
import os.{Path, Shellable}
import utest._

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, PrintStream}
import java.nio.file.NoSuchFileException
import scala.util.control.NonFatal

object IntegrationTestSuite {
case class EvalResult(isSuccess: Boolean, out: String, err: String)

}

abstract class IntegrationTestSuite extends TestSuite {
Expand All @@ -37,16 +30,20 @@ abstract class IntegrationTestSuite extends TestSuite {

var runnerState = RunnerState.empty

def eval(s: String*): Boolean = evalFork(os.Inherit, os.Inherit, s)
def eval(s: Shellable*): Boolean = evalFork(os.Inherit, os.Inherit, s, -1)

def evalStdout(s: Shellable*): IntegrationTestSuite.EvalResult = {
evalTimeoutStdout(-1, s: _*)
}

def evalStdout(s: String*): IntegrationTestSuite.EvalResult = {
def evalTimeoutStdout(timeout: Long, s: Shellable*): IntegrationTestSuite.EvalResult = {

val output = Seq.newBuilder[String]
val error = Seq.newBuilder[String]
val processOutput = os.ProcessOutput.Readlines(output += _)
val processError = os.ProcessOutput.Readlines(error += _)

val result = evalFork(processOutput, processError, s)
val result = evalFork(processOutput, processError, s, timeout)
IntegrationTestSuite.EvalResult(
result,
output.result().mkString("\n"),
Expand All @@ -61,7 +58,8 @@ abstract class IntegrationTestSuite extends TestSuite {
private def evalFork(
stdout: os.ProcessOutput,
stderr: os.ProcessOutput,
s: Seq[String]
s: Seq[Shellable],
timeout: Long
): Boolean = {
val serverArgs =
if (integrationTestMode == "server" || integrationTestMode == "local") Seq()
Expand All @@ -75,7 +73,8 @@ abstract class IntegrationTestSuite extends TestSuite {
stdin = os.Inherit,
stdout = stdout,
stderr = stderr,
env = millTestSuiteEnv
env = millTestSuiteEnv,
timeout = timeout
)
true
} catch {
Expand Down
4 changes: 4 additions & 0 deletions main/api/src/mill/api/Result.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ sealed trait Result[+T] {
def map[V](f: T => V): Result[V]
def flatMap[V](f: T => Result[V]): Result[V]
def asSuccess: Option[Result.Success[T]] = None
def asFailing: Option[Result.Failing[T]] = None

}

object Result {
Expand Down Expand Up @@ -56,6 +58,8 @@ object Result {
sealed trait Failing[+T] extends Result[T] {
def map[V](f: T => V): Failing[V]
def flatMap[V](f: T => Result[V]): Failing[V]
override def asFailing: Option[Result.Failing[T]] = Some(this)

}

/**
Expand Down
Loading

0 comments on commit bde3d59

Please sign in to comment.