From 3f7488bdaf0e930577d4398ebf5bd3552e5b9a47 Mon Sep 17 00:00:00 2001 From: Martin Duhem Date: Tue, 20 Jun 2017 21:22:31 +0200 Subject: [PATCH 1/2] New `WatchService` based on Java 7's `WatchService` The goal of this is to add an abstraction layer so that we can define new kinds of `WatchService`s. This way, we can use the default `WatchService` provided by the JDK on certain platform (e.g. Windows), and revert back to the `PollingWatchService` that existed before, on machine where there's no native support for monitoring the file system (e.g. macOS). This commit includes an adapter for the `WatchService` from Java NIO to our new interface, and a port of the existing `PollingWatchService`. --- .travis.yml | 2 +- build.sbt | 2 +- .../internal/io/SourceModificationWatch.scala | 192 ++++++++++-- .../scala/sbt/io/PollingWatchService.scala | 165 ++++++++++ io/src/main/scala/sbt/io/WatchService.scala | 90 ++++++ .../internal/io/DefaultWatchServiceSpec.scala | 13 + .../internal/io/PollingWatchServiceSpec.scala | 6 + .../io/SourceModificationWatchSpec.scala | 294 ++++++++++++++++++ 8 files changed, 728 insertions(+), 36 deletions(-) create mode 100644 io/src/main/scala/sbt/io/PollingWatchService.scala create mode 100644 io/src/main/scala/sbt/io/WatchService.scala create mode 100644 io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala create mode 100644 io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala create mode 100644 io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala diff --git a/.travis.yml b/.travis.yml index 4da1aced..4fc3b720 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,6 @@ scala: - 2.10.6 - 2.11.11 - 2.12.2 - + - 2.13.0-M1 jdk: - oraclejdk8 diff --git a/build.sbt b/build.sbt index c444b7ad..0b5c9de1 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ import Dependencies._ // import com.typesafe.tools.mima.core._, ProblemFilters._ -def baseVersion: String = "1.0.0-M8" +def baseVersion: String = "1.0.0-M12" def commonSettings: Seq[Setting[_]] = Seq( scalaVersion := scala212, diff --git a/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala b/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala index 0d5ac507..58634095 100644 --- a/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala +++ b/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala @@ -3,45 +3,169 @@ */ package sbt.internal.io -import annotation.tailrec -import sbt.io.PathFinder +import java.nio.file.{ Files, Path, WatchEvent, WatchKey } +import java.nio.file.StandardWatchEventKinds._ + +import sbt.io.{ DirectoryFilter, FileFilter, WatchService } +import sbt.io.syntax._ + +import scala.annotation.tailrec private[sbt] object SourceModificationWatch { - @tailrec def watch(sourcesFinder: PathFinder, pollDelayMillis: Int, state: WatchState)(terminationCondition: => Boolean): (Boolean, WatchState) = - { - import state._ - - val sourceFiles: Iterable[java.io.File] = sourcesFinder.get - val sourceFilesPath: Set[String] = sourceFiles.map(_.getCanonicalPath)(collection.breakOut) - val lastModifiedTime = - (0L /: sourceFiles) { (acc, file) => math.max(acc, file.lastModified) } - - val sourcesModified = - lastModifiedTime > lastCallbackCallTime || - previousFiles != sourceFilesPath - - val (triggered, newCallbackCallTime) = - if (sourcesModified) - (false, System.currentTimeMillis) - else - (awaitingQuietPeriod, lastCallbackCallTime) - - val newState = new WatchState(newCallbackCallTime, sourceFilesPath, sourcesModified, if (triggered) count + 1 else count) - if (triggered) - (true, newState) - else { - Thread.sleep(pollDelayMillis.toLong) - if (terminationCondition) - (false, newState) - else - watch(sourcesFinder, pollDelayMillis, newState)(terminationCondition) + + /** + * Checks for modifications on the file system every `delayMillis` milliseconds, + * until changes are detected or `terminationCondition` evaluates to `true`. + */ + @tailrec + def watch(delayMillis: Long, state: WatchState)(terminationCondition: => Boolean): (Boolean, WatchState) = { + if (state.count == 0) (true, state.withCount(1)) + else { + val events = + state.pollEvents().map(expandEvent) + + if (events.isEmpty) { + if (terminationCondition) { + (false, state) + } else { + Thread.sleep(delayMillis) + watch(delayMillis, state)(terminationCondition) + } + } else { + val previousFiles = state.registered.keySet + val newFiles = state.sources.flatMap(_.getUnfilteredPaths()).toSet + val createdFiles = newFiles -- previousFiles + val deletedFiles = previousFiles -- newFiles + + // We may have events that are not relevant (e.g., created an empty directory.) + // We filter out those changes, so that we don't trigger unnecessarily. + val filteredDeleted = deletedFiles.filter(p => state.sources.exists(_.accept(p, false))) + val filteredCreated = createdFiles.filter(p => state.sources.exists(_.accept(p, false))) + val filteredModified = events.collect { + case (p, ENTRY_MODIFY) if state.sources.exists(_.accept(p, false)) => p + } + + // Register and remove _unfiltered_ files. This is correct because directories + // are likely to be filtered out (for instance), but we should still add them + // to the files that are watched. + // We don't increment count because we don't know yet if we'll trigger. + val newState = state ++ createdFiles -- deletedFiles + + if (filteredCreated.nonEmpty || filteredDeleted.nonEmpty || filteredModified.nonEmpty) { + (true, newState.withCount(newState.count + 1)) + } else { + Thread.sleep(delayMillis) + watch(delayMillis, newState)(terminationCondition) + } + } + } + } + + private def expandEvent(event: (Path, WatchEvent[_])): (Path, WatchEvent.Kind[Path]) = { + event match { + case (base, ev) => + val fullPath = base.resolve(ev.context().asInstanceOf[Path]) + val kind = ev.kind().asInstanceOf[WatchEvent.Kind[Path]] + (fullPath, kind) + } + } +} + +/** The state of the file watch. */ +final class WatchState private ( + val count: Int, + private[sbt] val sources: Seq[Source], + service: WatchService, + private[sbt] val registered: Map[Path, WatchKey] +) { + /** Removes all of `fs` from the watched paths. */ + private[sbt] def --(fs: Iterable[Path]): WatchState = { + for { f <- fs; + wk <- registered.get(f); + if (registered.values.count(_ == wk)) <= 1 } wk.cancel() + withRegistered(registered -- fs) + } + + /** Adds all of `fs` to the watched paths. */ + private[sbt] def ++(fs: Iterable[Path]): WatchState = { + val newKeys = + fs.filter(Files.exists(_)).foldLeft(registered) { + case (ks, d) if Files.isDirectory(d) => + if (registered.contains(d)) ks + else ks + (d -> service.register(d, WatchState.events: _*)) + + case (ks, f) => + val parent = f.getParent + if (registered.contains(parent)) ks + (f -> registered(parent)) + else ks + (f -> service.register(parent, WatchState.events: _*)) } + withRegistered(newKeys) + } + + /** Retrieve events from the `WatchService` */ + private[sbt] def pollEvents(): Iterable[(Path, WatchEvent[_])] = { + val events = service.pollEvents + events.toIterable.flatMap { + case (k, evs) => evs.map((k.watchable().asInstanceOf[Path], _)) } + } + + /** A new state, with a new `count`. */ + private[sbt] def withCount(count: Int): WatchState = + new WatchState(count, sources, service, registered) + + /** A new state, with new keys registered. */ + private[sbt] def withRegistered(registered: Map[Path, WatchKey]): WatchState = + new WatchState(count, sources, service, registered) } -private[sbt] final class WatchState(val lastCallbackCallTime: Long, val previousFiles: Set[String], val awaitingQuietPeriod: Boolean, val count: Int) { - def previousFileCount: Int = previousFiles.size + +/** + * Represents how to acquire a list of items to watch. + * @param base Where to start looking for files. + * @param includeFilter Filter to apply to determine whether to include a file. + * @param excludeFilter Filter to apply to determine whether to ignore a file. + */ +final class Source(base: File, includeFilter: FileFilter, excludeFilter: FileFilter) { + /** + * Determine whether `p` should be included in this source. + * @param p The path to test. + * @param includeDirs Whether all directories should be included. + * @return True, if `p` should be included, false otherwise. + */ + private[sbt] def accept(p: Path, includeDirs: Boolean = false): Boolean = { + val inc = + if (includeDirs) DirectoryFilter || includeFilter + else includeFilter + + p.startsWith(base.toPath) && + inc.accept(p.toFile) && + !excludeFilter.accept(p.toFile) + } + + /** + * Gathers all the paths from this source without applying filters. + * @return A sequence of all the paths collected from this source. + */ + private[sbt] def getUnfilteredPaths(): Seq[Path] = + base.allPaths.get.map(_.toPath) } -private[sbt] object WatchState { - def empty = new WatchState(0L, Set.empty[String], false, 0) +object WatchState { + /** What events should be monitored */ + val events: Array[WatchEvent.Kind[Path]] = Array(ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY) + + /** + * An empty `WatchState`. + * @param service The `WatchService` to use to monitor the file system. + * @param sources The sources from where to collect the paths. + * @return An initial `WatchState`. + */ + def empty(service: WatchService, sources: Seq[Source]): WatchState = { + val initFiles = sources.flatMap(_.getUnfilteredPaths()) + assert(initFiles.nonEmpty) + val initState = new WatchState(0, sources, service, Map.empty) ++ initFiles + service.init() + initState + } + } diff --git a/io/src/main/scala/sbt/io/PollingWatchService.scala b/io/src/main/scala/sbt/io/PollingWatchService.scala new file mode 100644 index 00000000..3dba9b85 --- /dev/null +++ b/io/src/main/scala/sbt/io/PollingWatchService.scala @@ -0,0 +1,165 @@ +package sbt.io + +import java.nio.file.{ClosedWatchServiceException, Files, Path => JPath, Watchable, WatchKey, WatchEvent} +import java.nio.file.StandardWatchEventKinds._ +import java.util.{List => JList} + +import sbt.io.syntax._ +import scala.collection.mutable + +/** A `WatchService` that polls the filesystem every `delayMs` milliseconds. */ +class PollingWatchService(delayMs: Long) extends WatchService { + private var closed: Boolean = false + private val thread: PollingThread = new PollingThread(delayMs) + private val keys: mutable.Map[JPath, PollingWatchKey] = mutable.Map.empty + private val pathLengthOrdering: Ordering[JPath] = + Ordering.fromLessThan { + case (null, _) | (_, null) => true + case (a, b) => + a.toString.length < b.toString.length + } + + private val watched: mutable.Map[JPath, Seq[WatchEvent.Kind[JPath]]] = + mutable.Map.empty + + override def close(): Unit = + closed = true + + override def init(): Unit = { + ensureNotClosed() + thread.start() + while (!thread.initDone) { + Thread.sleep(100) + } + } + + override def poll(timeoutMs: Long): WatchKey = thread.keysWithEvents.synchronized { + ensureNotClosed() + thread.keysWithEvents.headOption.map { k => + thread.keysWithEvents -= k + k + }.orNull + } + + override def pollEvents(): Map[WatchKey, Seq[WatchEvent[JPath]]] = thread.keysWithEvents.synchronized { + import scala.collection.JavaConverters._ + ensureNotClosed() + val events = thread.keysWithEvents.map { k => + k -> k.pollEvents().asScala.asInstanceOf[Seq[WatchEvent[JPath]]] + } + thread.keysWithEvents.clear() + events.toMap + } + + override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = { + ensureNotClosed() + val key = new PollingWatchKey(thread, path, new java.util.ArrayList[WatchEvent[_]]) + keys += path -> key + watched += path -> events + key + } + + private def ensureNotClosed(): Unit = + if (closed) throw new ClosedWatchServiceException + + private class PollingThread(delayMs: Long) extends Thread { + private var fileTimes: Map[JPath, Long] = Map.empty + var initDone = false + val keysWithEvents = mutable.LinkedHashSet.empty[WatchKey] + + override def run(): Unit = + while (!closed) { + populateEvents() + initDone = true + Thread.sleep(delayMs) + } + + def getFileTimes(): Map[JPath, Long] = { + val results = mutable.Map.empty[JPath, Long] + watched.toSeq.sortBy(_._1)(pathLengthOrdering).foreach { + case (p, _) => + if (!results.contains(p)) + p.toFile.allPaths.get.foreach(f => results += f.toPath -> f.lastModified) + } + results.toMap + } + + private def addEvent(path: JPath, ev: WatchEvent[JPath]): Unit = keysWithEvents.synchronized { + keys.get(path).foreach { k => + keysWithEvents += k + k.events.add(ev) + } + } + + private def populateEvents(): Unit = { + val newFileTimes = getFileTimes() + val newFiles = newFileTimes.keySet + val oldFiles = fileTimes.keySet + + val deletedFiles = (oldFiles -- newFiles).toSeq + val createdFiles = (newFiles -- oldFiles).toSeq + + val modifiedFiles = fileTimes.collect { + case (p, oldTime) if newFileTimes.getOrElse(p, 0L) > oldTime => p + } + fileTimes = newFileTimes + + deletedFiles.foreach { deleted => + val parent = deleted.getParent + if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_DELETE)) { + val ev = new PollingWatchEvent(parent.relativize(deleted), ENTRY_DELETE) + addEvent(parent, ev) + } + watched -= deleted + } + + createdFiles.sorted(pathLengthOrdering).foreach { + case dir if Files.isDirectory(dir) => + val parent = dir.getParent + val parentEvents = watched.getOrElse(parent, Seq.empty) + if (parentEvents.contains(ENTRY_CREATE)) { + val ev = new PollingWatchEvent(parent.relativize(dir), ENTRY_CREATE) + addEvent(parent, ev) + } + + case file => + val parent = file.getParent + if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_CREATE)) { + val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_CREATE) + addEvent(parent, ev) + } + } + + modifiedFiles.foreach { + case file => + val parent = file.getParent + if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_MODIFY)) { + val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_MODIFY) + addEvent(parent, ev) + } + } + } + + } + + private class PollingWatchKey(origin: PollingThread, + override val watchable: Watchable, + val events: JList[WatchEvent[_]]) extends WatchKey { + override def cancel(): Unit = () + override def isValid(): Boolean = true + override def pollEvents(): java.util.List[WatchEvent[_]] = origin.keysWithEvents.synchronized { + origin.keysWithEvents -= this + val evs = new java.util.ArrayList[WatchEvent[_]](events) + events.clear() + evs + } + override def reset(): Boolean = true + } + +} + +private class PollingWatchEvent(override val context: JPath, + override val kind: WatchEvent.Kind[JPath]) extends WatchEvent[JPath] { + override val count: Int = 1 +} + diff --git a/io/src/main/scala/sbt/io/WatchService.scala b/io/src/main/scala/sbt/io/WatchService.scala new file mode 100644 index 00000000..df74dd91 --- /dev/null +++ b/io/src/main/scala/sbt/io/WatchService.scala @@ -0,0 +1,90 @@ +package sbt.io + +import java.nio.file.{ ClosedWatchServiceException, WatchEvent, WatchKey, Path => JPath, WatchService => JWatchService } +import java.util.concurrent.TimeUnit + +import scala.collection.mutable + +import scala.collection.JavaConverters._ + +object WatchService { + + /** + * Adapts a Java `WatchService` to be used with sbt's `WatchService` infrastructure. + * @param service The `WatchService` to use. + */ + implicit final class WatchServiceAdapter(service: JWatchService) extends WatchService { + private var closed: Boolean = false + private val registered: mutable.Buffer[WatchKey] = mutable.Buffer.empty + + override def init(): Unit = + () + + override def pollEvents(): Map[WatchKey, Seq[WatchEvent[JPath]]] = + registered.flatMap { k => + val events = k.pollEvents() + if (events.isEmpty) None + else Some((k, events.asScala.asInstanceOf[Seq[WatchEvent[JPath]]])) + }.toMap + + override def poll(timeoutMs: Long): WatchKey = { + service.poll(timeoutMs, TimeUnit.MILLISECONDS) + } + + override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = { + if (closed) throw new ClosedWatchServiceException + else { + val key = path.register(service, events: _*) + registered += key + key + } + } + + override def close(): Unit = { + closed = true + service.close() + } + + override def toString(): String = + service.toString() + } + +} + +/** + * A service that will monitor the file system for file creation, deletion + * and modification. + */ +trait WatchService { + + /** Initializes the watchservice. */ + def init(): Unit + + /** + * Retrieves all the events and groups them by watch key. + * Does not wait if no event is available. + * @return The pending events. + */ + def pollEvents(): Map[WatchKey, Seq[WatchEvent[JPath]]] + + /** + * Retrieves the next `WatchKey` that has a `WatchEvent` waiting. Waits + * up to `timeoutMs` milliseconds if no such key exists. + * @param timeoutMs Maximum time to wait, in milliseconds. + * @return The next `WatchKey` that received an event. + */ + def poll(timeoutMs: Long): WatchKey + + /** + * Registers a path to be monitored. + * @param path The path to monitor. + * @param events The events that should be registered. + * @return A `WatchKey`, that represents a token of registration. + */ + def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey + + /** + * Closes this `WatchService`. + */ + def close(): Unit +} diff --git a/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala b/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala new file mode 100644 index 00000000..7b453968 --- /dev/null +++ b/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala @@ -0,0 +1,13 @@ +package sbt.internal.io + +import java.nio.file.FileSystems + +object DefaultWatchServiceSpec { + val (pollDelayMs, maxWaitTimeMs) = + Option(sys.props("os.name")) match { + case Some("Mac OS X") => (200L, 15000L) + case _ => (50L, 3000L) + } +} +class DefaultWatchServiceSpec extends SourceModificationWatchSpec(FileSystems.getDefault.newWatchService, DefaultWatchServiceSpec.pollDelayMs, DefaultWatchServiceSpec.maxWaitTimeMs) + diff --git a/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala b/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala new file mode 100644 index 00000000..464edee2 --- /dev/null +++ b/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala @@ -0,0 +1,6 @@ +package sbt.internal.io + +import sbt.io.PollingWatchService + +class PollingWatchServiceSpec extends SourceModificationWatchSpec(new PollingWatchService(500L), 500L, 3000L) + diff --git a/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala new file mode 100644 index 00000000..c409d09b --- /dev/null +++ b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala @@ -0,0 +1,294 @@ +package sbt.internal.io + +import java.nio.file.{ ClosedWatchServiceException, Paths } + +import org.scalatest.{ Assertion, FlatSpec, Matchers } +import sbt.io.syntax._ +import sbt.io.{ IO, SimpleFilter, WatchService } + +abstract class SourceModificationWatchSpec(getService: => WatchService, pollDelayMs: Long, maxWaitMs: Long) extends FlatSpec with Matchers { + + it should "watch a directory for file creation" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val created = parentDir / "NewSource.scala" + + IO.createDirectory(parentDir) + + watchTest(parentDir)(pollDelayMs, maxWaitMs) { + IO.write(created, "foo") + } + } + + it should "ignore creation of directories with no tracked sources" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val created = parentDir / "ignoreme" + + IO.createDirectory(parentDir) + + watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + IO.createDirectory(created) + } + } + + it should "ignore creation of files that do not match inclusion filter" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val created = parentDir / "ignoreme" + + IO.createDirectory(parentDir) + + watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + IO.touch(created) + } + } + + it should "ignore creation of files that are explicitly ignored" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val created = parentDir / ".hidden.scala" + + IO.createDirectory(parentDir) + + watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + IO.touch(created) + } + } + + it should "ignore creation of an empty directory" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val created = parentDir / "ignoreme" + + IO.createDirectory(parentDir) + + watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + IO.createDirectory(created) + } + } + + it should "detect files created in a subdirectory" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val subDir = parentDir / "sub" + val created = subDir / "NewSource.scala" + + IO.createDirectory(subDir) + + watchTest(parentDir)(pollDelayMs, maxWaitMs) { + IO.write(created, "foo") + } + } + + it should "ignore creation of files not included in inclusion filter in subdirectories" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val subDir = parentDir / "sub" + val created = subDir / "ignoreme" + + IO.createDirectory(subDir) + + watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + IO.touch(created) + } + } + + it should "ignore creation of files explicitly ignored in subdirectories" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val subDir = parentDir / "sub" + val created = subDir / ".hidden.scala" + + IO.createDirectory(subDir) + + watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + IO.touch(created) + } + } + + it should "ignore creation of empty directories in a subdirectory" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val subDir = parentDir / "sub" + val created = subDir / "ignoreme" + + IO.createDirectory(subDir) + + watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + IO.createDirectory(created) + } + } + + it should "detect deleted files" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val file = parentDir / "WillBeDeleted.scala" + IO.write(file, "foo") + + watchTest(parentDir)(pollDelayMs, maxWaitMs) { + IO.delete(file) + } + } + + it should "ignore deletion of files not included in inclusion filter" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val file = parentDir / "ignoreme" + IO.write(file, "foo") + + watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + IO.delete(file) + } + } + + it should "ignore deletion of files explicitly ignored" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val file = parentDir / ".hidden.scala" + IO.write(file, "foo") + + watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + IO.delete(file) + } + } + + it should "ignore deletion of empty directories" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val subDir = parentDir / "ignoreme" + IO.createDirectory(subDir) + + watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + IO.delete(subDir) + } + } + + it should "detect deleted files in subdirectories" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val subDir = parentDir / "subdir" + val willBeDeleted = subDir / "WillBeDeleted.scala" + IO.write(willBeDeleted, "foo") + + watchTest(parentDir)(pollDelayMs, maxWaitMs) { + IO.delete(willBeDeleted) + } + } + + it should "ignore deletion of files not included in inclusion filter in subdirectories" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val subDir = parentDir / "subdir" + val willBeDeleted = subDir / "ignoreme" + IO.write(willBeDeleted, "foo") + + watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + IO.delete(willBeDeleted) + } + } + + it should "ignore deletion of files explicitly ignored in subdirectories" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val subDir = parentDir / "subdir" + val willBeDeleted = subDir / ".hidden.scala" + IO.write(willBeDeleted, "foo") + + watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + IO.delete(willBeDeleted) + } + } + + it should "ignore deletion of empty directories in subdirectories" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val subDir = parentDir / "subdir" + val willBeDeleted = subDir / "ignoreme" + IO.createDirectory(willBeDeleted) + + watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + IO.delete(willBeDeleted) + } + } + + it should "ignore creation and then deletion of empty directories" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val subDir = parentDir / "subdir" + val service = getService + IO.createDirectory(parentDir) + + try { + val initState = emptyState(service, parentDir) + val (triggered0, newState0) = watchTest(initState)(pollDelayMs, maxWaitMs) { + IO.createDirectory(subDir) + } + triggered0 shouldBe false + newState0.count shouldBe 1 + + val (triggered1, newState1) = watchTest(newState0)(pollDelayMs, maxWaitMs) { + IO.delete(subDir) + } + triggered1 shouldBe false + newState1.count shouldBe 1 + } finally service.close() + } + + it should "detect deletion of a directory containing watched files" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val subDir = parentDir / "subdir" + val src = subDir / "src.scala" + val service = getService + + IO.createDirectory(parentDir) + + try { + val initState = emptyState(service, parentDir) + val (triggered0, newState0) = watchTest(initState)(pollDelayMs, maxWaitMs) { + IO.createDirectory(subDir) + IO.touch(src) + } + triggered0 shouldBe true + newState0.count shouldBe 2 + + val (triggered1, newState1) = watchTest(newState0)(pollDelayMs, maxWaitMs) { + IO.delete(subDir) + } + triggered1 shouldBe true + newState1.count shouldBe 3 + } finally service.close() + } + + "WatchService.poll" should "throw a `ClosedWatchServiceException` if used after `close`" in { + val service = getService + service.close() + assertThrows[ClosedWatchServiceException](service.poll(1000L)) + } + + "WatchService.register" should "throw a `ClosedWatchServiceException` if used after `close`" in { + val service = getService + service.close() + assertThrows[ClosedWatchServiceException](service.register(Paths.get("."))) + } + + "WatchService.close" should "not throw if called multiple times" in { + val service = getService + service.close() + service.close() + } + + private def watchTest(initState: WatchState)(pollDelayMs: Long, maxWaitMs: Long)(modifier: => Unit): (Boolean, WatchState) = { + var started = false + val startTime = System.currentTimeMillis() + val modThread = new Thread { + override def run(): Unit = { + modifier + } + } + SourceModificationWatch.watch(pollDelayMs, initState) { + if (!started) { + started = true + modThread.start() + } + System.currentTimeMillis() - startTime > maxWaitMs + } + } + + private def watchTest(base: File)(pollDelayMs: Long, maxWaitMs: Long, expectedTrigger: Boolean = true)(modifier: => Unit): Assertion = { + val service = getService + try { + val initState = emptyState(service, base) + val (triggered, _) = watchTest(initState)(pollDelayMs, maxWaitMs)(modifier) + triggered shouldBe expectedTrigger + } finally service.close() + } + + private def emptyState(service: WatchService, base: File): WatchState = { + val sources = Seq(new Source(base, "*.scala", new SimpleFilter(_.startsWith(".")))) + WatchState.empty(service, sources).withCount(1) + } + +} From 75afc791f44fa93056178a15d1a90fe9d78b7f98 Mon Sep 17 00:00:00 2001 From: Martin Duhem Date: Thu, 29 Jun 2017 09:12:37 +0200 Subject: [PATCH 2/2] Use `Duration` --- .../internal/io/SourceModificationWatch.scala | 15 ++--- .../scala/sbt/io/PollingWatchService.scala | 19 ++++-- io/src/main/scala/sbt/io/WatchService.scala | 25 +++++--- .../internal/io/DefaultWatchServiceSpec.scala | 14 +++-- .../internal/io/PollingWatchServiceSpec.scala | 6 +- .../io/SourceModificationWatchSpec.scala | 60 ++++++++++--------- 6 files changed, 84 insertions(+), 55 deletions(-) diff --git a/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala b/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala index 58634095..43bc0679 100644 --- a/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala +++ b/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala @@ -10,6 +10,7 @@ import sbt.io.{ DirectoryFilter, FileFilter, WatchService } import sbt.io.syntax._ import scala.annotation.tailrec +import scala.concurrent.duration.FiniteDuration private[sbt] object SourceModificationWatch { @@ -18,7 +19,7 @@ private[sbt] object SourceModificationWatch { * until changes are detected or `terminationCondition` evaluates to `true`. */ @tailrec - def watch(delayMillis: Long, state: WatchState)(terminationCondition: => Boolean): (Boolean, WatchState) = { + def watch(delay: FiniteDuration, state: WatchState)(terminationCondition: => Boolean): (Boolean, WatchState) = { if (state.count == 0) (true, state.withCount(1)) else { val events = @@ -28,8 +29,8 @@ private[sbt] object SourceModificationWatch { if (terminationCondition) { (false, state) } else { - Thread.sleep(delayMillis) - watch(delayMillis, state)(terminationCondition) + Thread.sleep(delay.toMillis) + watch(delay, state)(terminationCondition) } } else { val previousFiles = state.registered.keySet @@ -54,8 +55,8 @@ private[sbt] object SourceModificationWatch { if (filteredCreated.nonEmpty || filteredDeleted.nonEmpty || filteredModified.nonEmpty) { (true, newState.withCount(newState.count + 1)) } else { - Thread.sleep(delayMillis) - watch(delayMillis, newState)(terminationCondition) + Thread.sleep(delay.toMillis) + watch(delay, newState)(terminationCondition) } } } @@ -72,7 +73,7 @@ private[sbt] object SourceModificationWatch { } /** The state of the file watch. */ -final class WatchState private ( +private[sbt] final class WatchState private ( val count: Int, private[sbt] val sources: Seq[Source], service: WatchService, @@ -150,7 +151,7 @@ final class Source(base: File, includeFilter: FileFilter, excludeFilter: FileFil base.allPaths.get.map(_.toPath) } -object WatchState { +private[sbt] object WatchState { /** What events should be monitored */ val events: Array[WatchEvent.Kind[Path]] = Array(ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY) diff --git a/io/src/main/scala/sbt/io/PollingWatchService.scala b/io/src/main/scala/sbt/io/PollingWatchService.scala index 3dba9b85..21693d84 100644 --- a/io/src/main/scala/sbt/io/PollingWatchService.scala +++ b/io/src/main/scala/sbt/io/PollingWatchService.scala @@ -6,11 +6,12 @@ import java.util.{List => JList} import sbt.io.syntax._ import scala.collection.mutable +import scala.concurrent.duration.{Duration, FiniteDuration} -/** A `WatchService` that polls the filesystem every `delayMs` milliseconds. */ -class PollingWatchService(delayMs: Long) extends WatchService { +/** A `WatchService` that polls the filesystem every `delay`. */ +class PollingWatchService(delay: FiniteDuration) extends WatchService { private var closed: Boolean = false - private val thread: PollingThread = new PollingThread(delayMs) + private val thread: PollingThread = new PollingThread(delay) private val keys: mutable.Map[JPath, PollingWatchKey] = mutable.Map.empty private val pathLengthOrdering: Ordering[JPath] = Ordering.fromLessThan { @@ -33,8 +34,13 @@ class PollingWatchService(delayMs: Long) extends WatchService { } } - override def poll(timeoutMs: Long): WatchKey = thread.keysWithEvents.synchronized { + override def poll(timeout: Duration): WatchKey = thread.keysWithEvents.synchronized { ensureNotClosed() + thread.keysWithEvents.synchronized { + if (thread.keysWithEvents.isEmpty) { + thread.keysWithEvents.wait(timeout.toMillis) + } + } thread.keysWithEvents.headOption.map { k => thread.keysWithEvents -= k k @@ -62,7 +68,7 @@ class PollingWatchService(delayMs: Long) extends WatchService { private def ensureNotClosed(): Unit = if (closed) throw new ClosedWatchServiceException - private class PollingThread(delayMs: Long) extends Thread { + private class PollingThread(delay: FiniteDuration) extends Thread { private var fileTimes: Map[JPath, Long] = Map.empty var initDone = false val keysWithEvents = mutable.LinkedHashSet.empty[WatchKey] @@ -71,7 +77,7 @@ class PollingWatchService(delayMs: Long) extends WatchService { while (!closed) { populateEvents() initDone = true - Thread.sleep(delayMs) + Thread.sleep(delay.toMillis) } def getFileTimes(): Map[JPath, Long] = { @@ -88,6 +94,7 @@ class PollingWatchService(delayMs: Long) extends WatchService { keys.get(path).foreach { k => keysWithEvents += k k.events.add(ev) + keysWithEvents.notifyAll() } } diff --git a/io/src/main/scala/sbt/io/WatchService.scala b/io/src/main/scala/sbt/io/WatchService.scala index df74dd91..1999fc04 100644 --- a/io/src/main/scala/sbt/io/WatchService.scala +++ b/io/src/main/scala/sbt/io/WatchService.scala @@ -3,9 +3,10 @@ package sbt.io import java.nio.file.{ ClosedWatchServiceException, WatchEvent, WatchKey, Path => JPath, WatchService => JWatchService } import java.util.concurrent.TimeUnit +import scala.annotation.tailrec import scala.collection.mutable - import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration object WatchService { @@ -27,9 +28,16 @@ object WatchService { else Some((k, events.asScala.asInstanceOf[Seq[WatchEvent[JPath]]])) }.toMap - override def poll(timeoutMs: Long): WatchKey = { - service.poll(timeoutMs, TimeUnit.MILLISECONDS) - } + @tailrec + override def poll(timeout: Duration): WatchKey = + if (timeout.isFinite) { + service.poll(timeout.toMillis, TimeUnit.MILLISECONDS) + } else { + service.poll(1000L, TimeUnit.MILLISECONDS) match { + case null => poll(timeout) + case key => key + } + } override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = { if (closed) throw new ClosedWatchServiceException @@ -69,11 +77,12 @@ trait WatchService { /** * Retrieves the next `WatchKey` that has a `WatchEvent` waiting. Waits - * up to `timeoutMs` milliseconds if no such key exists. - * @param timeoutMs Maximum time to wait, in milliseconds. - * @return The next `WatchKey` that received an event. + * until the `timeout` is expired is no such key exists. + * @param timeout Maximum time to wait + * @return The next `WatchKey` that received an event, or null if no such + * key exists. */ - def poll(timeoutMs: Long): WatchKey + def poll(timeout: Duration): WatchKey /** * Registers a path to be monitored. diff --git a/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala b/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala index 7b453968..ec96c53c 100644 --- a/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala +++ b/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala @@ -2,12 +2,18 @@ package sbt.internal.io import java.nio.file.FileSystems +import scala.concurrent.duration._ + object DefaultWatchServiceSpec { - val (pollDelayMs, maxWaitTimeMs) = + // java.nio's default watch service is much slower on MacOS at the moment. + // We give it more time to detect changes. + val (pollDelay, maxWaitTime) = Option(sys.props("os.name")) match { - case Some("Mac OS X") => (200L, 15000L) - case _ => (50L, 3000L) + case Some("Mac OS X") => (1.second, 15.seconds) + case _ => (50.milliseconds, 3.seconds) } } -class DefaultWatchServiceSpec extends SourceModificationWatchSpec(FileSystems.getDefault.newWatchService, DefaultWatchServiceSpec.pollDelayMs, DefaultWatchServiceSpec.maxWaitTimeMs) +class DefaultWatchServiceSpec extends SourceModificationWatchSpec(FileSystems.getDefault.newWatchService, + DefaultWatchServiceSpec.pollDelay, + DefaultWatchServiceSpec.maxWaitTime) diff --git a/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala b/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala index 464edee2..57901c5a 100644 --- a/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala +++ b/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala @@ -2,5 +2,9 @@ package sbt.internal.io import sbt.io.PollingWatchService -class PollingWatchServiceSpec extends SourceModificationWatchSpec(new PollingWatchService(500L), 500L, 3000L) +import scala.concurrent.duration._ + +class PollingWatchServiceSpec extends SourceModificationWatchSpec(new PollingWatchService(500.milliseconds), + 500.milliseconds, + 3.seconds) diff --git a/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala index c409d09b..a8831243 100644 --- a/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala +++ b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala @@ -6,7 +6,9 @@ import org.scalatest.{ Assertion, FlatSpec, Matchers } import sbt.io.syntax._ import sbt.io.{ IO, SimpleFilter, WatchService } -abstract class SourceModificationWatchSpec(getService: => WatchService, pollDelayMs: Long, maxWaitMs: Long) extends FlatSpec with Matchers { +import scala.concurrent.duration._ + +abstract class SourceModificationWatchSpec(getService: => WatchService, pollDelay: FiniteDuration, maxWait: FiniteDuration) extends FlatSpec with Matchers { it should "watch a directory for file creation" in IO.withTemporaryDirectory { dir => val parentDir = dir / "src" / "watchme" @@ -14,7 +16,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(parentDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs) { + watchTest(parentDir)(pollDelay, maxWait) { IO.write(created, "foo") } } @@ -25,7 +27,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(parentDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.createDirectory(created) } } @@ -36,7 +38,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(parentDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.touch(created) } } @@ -47,7 +49,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(parentDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.touch(created) } } @@ -58,7 +60,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(parentDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.createDirectory(created) } } @@ -70,7 +72,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(subDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs) { + watchTest(parentDir)(pollDelay, maxWait) { IO.write(created, "foo") } } @@ -82,7 +84,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(subDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.touch(created) } } @@ -94,7 +96,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(subDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.touch(created) } } @@ -106,7 +108,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(subDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.createDirectory(created) } } @@ -116,7 +118,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val file = parentDir / "WillBeDeleted.scala" IO.write(file, "foo") - watchTest(parentDir)(pollDelayMs, maxWaitMs) { + watchTest(parentDir)(pollDelay, maxWait) { IO.delete(file) } } @@ -126,7 +128,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val file = parentDir / "ignoreme" IO.write(file, "foo") - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.delete(file) } } @@ -136,7 +138,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val file = parentDir / ".hidden.scala" IO.write(file, "foo") - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.delete(file) } } @@ -146,7 +148,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val subDir = parentDir / "ignoreme" IO.createDirectory(subDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.delete(subDir) } } @@ -157,7 +159,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val willBeDeleted = subDir / "WillBeDeleted.scala" IO.write(willBeDeleted, "foo") - watchTest(parentDir)(pollDelayMs, maxWaitMs) { + watchTest(parentDir)(pollDelay, maxWait) { IO.delete(willBeDeleted) } } @@ -168,7 +170,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val willBeDeleted = subDir / "ignoreme" IO.write(willBeDeleted, "foo") - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.delete(willBeDeleted) } } @@ -179,7 +181,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val willBeDeleted = subDir / ".hidden.scala" IO.write(willBeDeleted, "foo") - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.delete(willBeDeleted) } } @@ -190,7 +192,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val willBeDeleted = subDir / "ignoreme" IO.createDirectory(willBeDeleted) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.delete(willBeDeleted) } } @@ -203,13 +205,13 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela try { val initState = emptyState(service, parentDir) - val (triggered0, newState0) = watchTest(initState)(pollDelayMs, maxWaitMs) { + val (triggered0, newState0) = watchTest(initState)(pollDelay, maxWait) { IO.createDirectory(subDir) } triggered0 shouldBe false newState0.count shouldBe 1 - val (triggered1, newState1) = watchTest(newState0)(pollDelayMs, maxWaitMs) { + val (triggered1, newState1) = watchTest(newState0)(pollDelay, maxWait) { IO.delete(subDir) } triggered1 shouldBe false @@ -227,14 +229,14 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela try { val initState = emptyState(service, parentDir) - val (triggered0, newState0) = watchTest(initState)(pollDelayMs, maxWaitMs) { + val (triggered0, newState0) = watchTest(initState)(pollDelay, maxWait) { IO.createDirectory(subDir) IO.touch(src) } triggered0 shouldBe true newState0.count shouldBe 2 - val (triggered1, newState1) = watchTest(newState0)(pollDelayMs, maxWaitMs) { + val (triggered1, newState1) = watchTest(newState0)(pollDelay, maxWait) { IO.delete(subDir) } triggered1 shouldBe true @@ -245,7 +247,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela "WatchService.poll" should "throw a `ClosedWatchServiceException` if used after `close`" in { val service = getService service.close() - assertThrows[ClosedWatchServiceException](service.poll(1000L)) + assertThrows[ClosedWatchServiceException](service.poll(1.second)) } "WatchService.register" should "throw a `ClosedWatchServiceException` if used after `close`" in { @@ -260,28 +262,28 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela service.close() } - private def watchTest(initState: WatchState)(pollDelayMs: Long, maxWaitMs: Long)(modifier: => Unit): (Boolean, WatchState) = { + private def watchTest(initState: WatchState)(pollDelay: FiniteDuration, maxWait: FiniteDuration)(modifier: => Unit): (Boolean, WatchState) = { var started = false - val startTime = System.currentTimeMillis() + val deadline = maxWait.fromNow val modThread = new Thread { override def run(): Unit = { modifier } } - SourceModificationWatch.watch(pollDelayMs, initState) { + SourceModificationWatch.watch(pollDelay, initState) { if (!started) { started = true modThread.start() } - System.currentTimeMillis() - startTime > maxWaitMs + deadline.isOverdue() } } - private def watchTest(base: File)(pollDelayMs: Long, maxWaitMs: Long, expectedTrigger: Boolean = true)(modifier: => Unit): Assertion = { + private def watchTest(base: File)(pollDelay: FiniteDuration, maxWait: FiniteDuration, expectedTrigger: Boolean = true)(modifier: => Unit): Assertion = { val service = getService try { val initState = emptyState(service, base) - val (triggered, _) = watchTest(initState)(pollDelayMs, maxWaitMs)(modifier) + val (triggered, _) = watchTest(initState)(pollDelay, maxWait)(modifier) triggered shouldBe expectedTrigger } finally service.close() }