Skip to content

Commit

Permalink
Merge pull request #47 from Duhemm/topic/watchservice
Browse files Browse the repository at this point in the history
New `WatchService` based on Java 7's `WatchService`
  • Loading branch information
eed3si9n committed Jun 29, 2017
2 parents 880de81 + 75afc79 commit 177ba25
Show file tree
Hide file tree
Showing 7 changed files with 755 additions and 34 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Dependencies._
// import com.typesafe.tools.mima.core._, ProblemFilters._

def baseVersion: String = "1.0.0-M8"
def baseVersion: String = "1.0.0-M12"

def commonSettings: Seq[Setting[_]] = Seq(
scalaVersion := scala212,
Expand Down
191 changes: 158 additions & 33 deletions io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,170 @@
*/
package sbt.internal.io

import annotation.tailrec
import sbt.io.PathFinder
import java.nio.file.{ Files, Path, WatchEvent, WatchKey }
import java.nio.file.StandardWatchEventKinds._

import sbt.io.{ DirectoryFilter, FileFilter, WatchService }
import sbt.io.syntax._

import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration

private[sbt] object SourceModificationWatch {
@tailrec def watch(sourcesFinder: PathFinder, pollDelayMillis: Int, state: WatchState)(terminationCondition: => Boolean): (Boolean, WatchState) =
{
import state._

val sourceFiles: Iterable[java.io.File] = sourcesFinder.get
val sourceFilesPath: Set[String] = sourceFiles.map(_.getCanonicalPath)(collection.breakOut)
val lastModifiedTime =
(0L /: sourceFiles) { (acc, file) => math.max(acc, file.lastModified) }

val sourcesModified =
lastModifiedTime > lastCallbackCallTime ||
previousFiles != sourceFilesPath

val (triggered, newCallbackCallTime) =
if (sourcesModified)
(false, System.currentTimeMillis)
else
(awaitingQuietPeriod, lastCallbackCallTime)

val newState = new WatchState(newCallbackCallTime, sourceFilesPath, sourcesModified, if (triggered) count + 1 else count)
if (triggered)
(true, newState)
else {
Thread.sleep(pollDelayMillis.toLong)
if (terminationCondition)
(false, newState)
else
watch(sourcesFinder, pollDelayMillis, newState)(terminationCondition)

/**
* Checks for modifications on the file system every `delayMillis` milliseconds,
* until changes are detected or `terminationCondition` evaluates to `true`.
*/
@tailrec
def watch(delay: FiniteDuration, state: WatchState)(terminationCondition: => Boolean): (Boolean, WatchState) = {
if (state.count == 0) (true, state.withCount(1))
else {
val events =
state.pollEvents().map(expandEvent)

if (events.isEmpty) {
if (terminationCondition) {
(false, state)
} else {
Thread.sleep(delay.toMillis)
watch(delay, state)(terminationCondition)
}
} else {
val previousFiles = state.registered.keySet
val newFiles = state.sources.flatMap(_.getUnfilteredPaths()).toSet
val createdFiles = newFiles -- previousFiles
val deletedFiles = previousFiles -- newFiles

// We may have events that are not relevant (e.g., created an empty directory.)
// We filter out those changes, so that we don't trigger unnecessarily.
val filteredDeleted = deletedFiles.filter(p => state.sources.exists(_.accept(p, false)))
val filteredCreated = createdFiles.filter(p => state.sources.exists(_.accept(p, false)))
val filteredModified = events.collect {
case (p, ENTRY_MODIFY) if state.sources.exists(_.accept(p, false)) => p
}

// Register and remove _unfiltered_ files. This is correct because directories
// are likely to be filtered out (for instance), but we should still add them
// to the files that are watched.
// We don't increment count because we don't know yet if we'll trigger.
val newState = state ++ createdFiles -- deletedFiles

if (filteredCreated.nonEmpty || filteredDeleted.nonEmpty || filteredModified.nonEmpty) {
(true, newState.withCount(newState.count + 1))
} else {
Thread.sleep(delay.toMillis)
watch(delay, newState)(terminationCondition)
}
}
}
}

private def expandEvent(event: (Path, WatchEvent[_])): (Path, WatchEvent.Kind[Path]) = {
event match {
case (base, ev) =>
val fullPath = base.resolve(ev.context().asInstanceOf[Path])
val kind = ev.kind().asInstanceOf[WatchEvent.Kind[Path]]
(fullPath, kind)
}
}
}

/** The state of the file watch. */
private[sbt] final class WatchState private (
val count: Int,
private[sbt] val sources: Seq[Source],
service: WatchService,
private[sbt] val registered: Map[Path, WatchKey]
) {
/** Removes all of `fs` from the watched paths. */
private[sbt] def --(fs: Iterable[Path]): WatchState = {
for { f <- fs;
wk <- registered.get(f);
if (registered.values.count(_ == wk)) <= 1 } wk.cancel()
withRegistered(registered -- fs)
}

/** Adds all of `fs` to the watched paths. */
private[sbt] def ++(fs: Iterable[Path]): WatchState = {
val newKeys =
fs.filter(Files.exists(_)).foldLeft(registered) {
case (ks, d) if Files.isDirectory(d) =>
if (registered.contains(d)) ks
else ks + (d -> service.register(d, WatchState.events: _*))

case (ks, f) =>
val parent = f.getParent
if (registered.contains(parent)) ks + (f -> registered(parent))
else ks + (f -> service.register(parent, WatchState.events: _*))
}
withRegistered(newKeys)
}

/** Retrieve events from the `WatchService` */
private[sbt] def pollEvents(): Iterable[(Path, WatchEvent[_])] = {
val events = service.pollEvents
events.toIterable.flatMap {
case (k, evs) => evs.map((k.watchable().asInstanceOf[Path], _))
}
}

/** A new state, with a new `count`. */
private[sbt] def withCount(count: Int): WatchState =
new WatchState(count, sources, service, registered)

/** A new state, with new keys registered. */
private[sbt] def withRegistered(registered: Map[Path, WatchKey]): WatchState =
new WatchState(count, sources, service, registered)
}
private[sbt] final class WatchState(val lastCallbackCallTime: Long, val previousFiles: Set[String], val awaitingQuietPeriod: Boolean, val count: Int) {
def previousFileCount: Int = previousFiles.size

/**
* Represents how to acquire a list of items to watch.
* @param base Where to start looking for files.
* @param includeFilter Filter to apply to determine whether to include a file.
* @param excludeFilter Filter to apply to determine whether to ignore a file.
*/
final class Source(base: File, includeFilter: FileFilter, excludeFilter: FileFilter) {
/**
* Determine whether `p` should be included in this source.
* @param p The path to test.
* @param includeDirs Whether all directories should be included.
* @return True, if `p` should be included, false otherwise.
*/
private[sbt] def accept(p: Path, includeDirs: Boolean = false): Boolean = {
val inc =
if (includeDirs) DirectoryFilter || includeFilter
else includeFilter

p.startsWith(base.toPath) &&
inc.accept(p.toFile) &&
!excludeFilter.accept(p.toFile)
}

/**
* Gathers all the paths from this source without applying filters.
* @return A sequence of all the paths collected from this source.
*/
private[sbt] def getUnfilteredPaths(): Seq[Path] =
base.allPaths.get.map(_.toPath)
}

private[sbt] object WatchState {
def empty = new WatchState(0L, Set.empty[String], false, 0)
/** What events should be monitored */
val events: Array[WatchEvent.Kind[Path]] = Array(ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)

/**
* An empty `WatchState`.
* @param service The `WatchService` to use to monitor the file system.
* @param sources The sources from where to collect the paths.
* @return An initial `WatchState`.
*/
def empty(service: WatchService, sources: Seq[Source]): WatchState = {
val initFiles = sources.flatMap(_.getUnfilteredPaths())
assert(initFiles.nonEmpty)
val initState = new WatchState(0, sources, service, Map.empty) ++ initFiles
service.init()
initState
}

}
172 changes: 172 additions & 0 deletions io/src/main/scala/sbt/io/PollingWatchService.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package sbt.io

import java.nio.file.{ClosedWatchServiceException, Files, Path => JPath, Watchable, WatchKey, WatchEvent}
import java.nio.file.StandardWatchEventKinds._
import java.util.{List => JList}

import sbt.io.syntax._
import scala.collection.mutable
import scala.concurrent.duration.{Duration, FiniteDuration}

/** A `WatchService` that polls the filesystem every `delay`. */
class PollingWatchService(delay: FiniteDuration) extends WatchService {
private var closed: Boolean = false
private val thread: PollingThread = new PollingThread(delay)
private val keys: mutable.Map[JPath, PollingWatchKey] = mutable.Map.empty
private val pathLengthOrdering: Ordering[JPath] =
Ordering.fromLessThan {
case (null, _) | (_, null) => true
case (a, b) =>
a.toString.length < b.toString.length
}

private val watched: mutable.Map[JPath, Seq[WatchEvent.Kind[JPath]]] =
mutable.Map.empty

override def close(): Unit =
closed = true

override def init(): Unit = {
ensureNotClosed()
thread.start()
while (!thread.initDone) {
Thread.sleep(100)
}
}

override def poll(timeout: Duration): WatchKey = thread.keysWithEvents.synchronized {
ensureNotClosed()
thread.keysWithEvents.synchronized {
if (thread.keysWithEvents.isEmpty) {
thread.keysWithEvents.wait(timeout.toMillis)
}
}
thread.keysWithEvents.headOption.map { k =>
thread.keysWithEvents -= k
k
}.orNull
}

override def pollEvents(): Map[WatchKey, Seq[WatchEvent[JPath]]] = thread.keysWithEvents.synchronized {
import scala.collection.JavaConverters._
ensureNotClosed()
val events = thread.keysWithEvents.map { k =>
k -> k.pollEvents().asScala.asInstanceOf[Seq[WatchEvent[JPath]]]
}
thread.keysWithEvents.clear()
events.toMap
}

override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = {
ensureNotClosed()
val key = new PollingWatchKey(thread, path, new java.util.ArrayList[WatchEvent[_]])
keys += path -> key
watched += path -> events
key
}

private def ensureNotClosed(): Unit =
if (closed) throw new ClosedWatchServiceException

private class PollingThread(delay: FiniteDuration) extends Thread {
private var fileTimes: Map[JPath, Long] = Map.empty
var initDone = false
val keysWithEvents = mutable.LinkedHashSet.empty[WatchKey]

override def run(): Unit =
while (!closed) {
populateEvents()
initDone = true
Thread.sleep(delay.toMillis)
}

def getFileTimes(): Map[JPath, Long] = {
val results = mutable.Map.empty[JPath, Long]
watched.toSeq.sortBy(_._1)(pathLengthOrdering).foreach {
case (p, _) =>
if (!results.contains(p))
p.toFile.allPaths.get.foreach(f => results += f.toPath -> f.lastModified)
}
results.toMap
}

private def addEvent(path: JPath, ev: WatchEvent[JPath]): Unit = keysWithEvents.synchronized {
keys.get(path).foreach { k =>
keysWithEvents += k
k.events.add(ev)
keysWithEvents.notifyAll()
}
}

private def populateEvents(): Unit = {
val newFileTimes = getFileTimes()
val newFiles = newFileTimes.keySet
val oldFiles = fileTimes.keySet

val deletedFiles = (oldFiles -- newFiles).toSeq
val createdFiles = (newFiles -- oldFiles).toSeq

val modifiedFiles = fileTimes.collect {
case (p, oldTime) if newFileTimes.getOrElse(p, 0L) > oldTime => p
}
fileTimes = newFileTimes

deletedFiles.foreach { deleted =>
val parent = deleted.getParent
if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_DELETE)) {
val ev = new PollingWatchEvent(parent.relativize(deleted), ENTRY_DELETE)
addEvent(parent, ev)
}
watched -= deleted
}

createdFiles.sorted(pathLengthOrdering).foreach {
case dir if Files.isDirectory(dir) =>
val parent = dir.getParent
val parentEvents = watched.getOrElse(parent, Seq.empty)
if (parentEvents.contains(ENTRY_CREATE)) {
val ev = new PollingWatchEvent(parent.relativize(dir), ENTRY_CREATE)
addEvent(parent, ev)
}

case file =>
val parent = file.getParent
if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_CREATE)) {
val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_CREATE)
addEvent(parent, ev)
}
}

modifiedFiles.foreach {
case file =>
val parent = file.getParent
if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_MODIFY)) {
val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_MODIFY)
addEvent(parent, ev)
}
}
}

}

private class PollingWatchKey(origin: PollingThread,
override val watchable: Watchable,
val events: JList[WatchEvent[_]]) extends WatchKey {
override def cancel(): Unit = ()
override def isValid(): Boolean = true
override def pollEvents(): java.util.List[WatchEvent[_]] = origin.keysWithEvents.synchronized {
origin.keysWithEvents -= this
val evs = new java.util.ArrayList[WatchEvent[_]](events)
events.clear()
evs
}
override def reset(): Boolean = true
}

}

private class PollingWatchEvent(override val context: JPath,
override val kind: WatchEvent.Kind[JPath]) extends WatchEvent[JPath] {
override val count: Int = 1
}

Loading

0 comments on commit 177ba25

Please sign in to comment.