Skip to content

Commit

Permalink
#64 Status bar should show an ETA and percentage of how much FXDS has…
Browse files Browse the repository at this point in the history
… processed yet
  • Loading branch information
mirkosertic committed May 4, 2019
1 parent 1d7f218 commit 6754ba7
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 50 deletions.
104 changes: 75 additions & 29 deletions src/main/java/de/mirkosertic/desktopsearch/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -65,30 +66,33 @@ public LuceneCommand(final FileEvent aFileEvent, final Content aContent) {
private final ContentExtractor contentExtractor;
private ProgressListener progressListener;
private final Map<Configuration.CrawlLocation, DirectoryWatcher> locations;
private final ExecutorPool executorPool;
private final Notifier notifier;
private final WatchServiceCache watchServiceCache;
private final PreviewProcessor previewProcessor;
private Configuration configuration;
private DirectoryListener directoryListener;
private final Statistics statistics;
private Thread progressInfo;

public Backend(final Notifier aNotifier, final Configuration aConfiguration, final PreviewProcessor aPreviewProcessor) throws IOException {
notifier = aNotifier;
previewProcessor = aPreviewProcessor;
locations = new HashMap<>();
executorPool = new ExecutorPool();
watchServiceCache = new WatchServiceCache();
contentExtractor = new ContentExtractor(aConfiguration);

statistics = new Statistics();
// This is our simple flux
Flux<FileEvent> theFileEventFlux = Flux.push(sink -> directoryListener = new DirectoryListener() {
final Flux<FileEvent> theFileEventFlux = Flux.push(sink -> directoryListener = new DirectoryListener() {

@Override
public void fileDeleted(final Configuration.CrawlLocation aLocation, final Path aFile) {
synchronized (this) {
try {
if (contentExtractor.supportsFile(aFile.toString())) {
final var theAttributes = Files.readAttributes(aFile, BasicFileAttributes.class);

statistics.newDeletedFileJob();

sink.next(new FileEvent(aLocation, aFile, theAttributes, FileEvent.EventType.DELETED));
}
} catch (final Exception e) {
Expand All @@ -103,20 +107,9 @@ public void fileCreatedOrModified(final Configuration.CrawlLocation aLocation, f
try {
if (contentExtractor.supportsFile(aFile.toString())) {
final var theAttributes = Files.readAttributes(aFile, BasicFileAttributes.class);
sink.next(new FileEvent(aLocation, aFile, theAttributes, FileEvent.EventType.UPDATED));
}
} catch (final Exception e) {
log.error("Error processing file {}", aFile, e);
}
}
}

@Override
public void fileFoundByCrawler(final Configuration.CrawlLocation aLocation, final Path aFile) {
synchronized (this) {
try {
if (contentExtractor.supportsFile(aFile.toString())) {
final var theAttributes = Files.readAttributes(aFile, BasicFileAttributes.class);
statistics.newModifiedFileJob();

sink.next(new FileEvent(aLocation, aFile, theAttributes, FileEvent.EventType.UPDATED));
}
} catch (final Exception e) {
Expand All @@ -127,7 +120,7 @@ public void fileFoundByCrawler(final Configuration.CrawlLocation aLocation, fina
});

// Filter update events for Files that were not changed
theFileEventFlux = theFileEventFlux.filter(aFileEvent -> {
final var theCheckFlux = theFileEventFlux.parallel(Math.max(1, Runtime.getRuntime().availableProcessors() / 2)).runOn(Schedulers.parallel()).filter(aFileEvent -> {
// Always keep delete file events
if (aFileEvent.type == FileEvent.EventType.DELETED) {
return true;
Expand All @@ -138,14 +131,19 @@ public void fileFoundByCrawler(final Configuration.CrawlLocation aLocation, fina
try {
final var theUpdateCheckResult = luceneIndexHandler
.checkIfModified(theFileName, aFileEvent.attributes.lastModifiedTime().toMillis());
return theUpdateCheckResult == UpdateCheckResult.UPDATED;

final boolean result = theUpdateCheckResult == UpdateCheckResult.UPDATED;
if (!result) {
statistics.jobSkipped();
}
return result;
} catch (final Exception e) {
throw Exceptions.propagate(e);
}
});

// Ok, we now map the file events to lucene commands
final var theLuceneFlux = theFileEventFlux.publishOn(Schedulers.newSingle("ContentExtractor")).map(
final var theLuceneFlux = theCheckFlux.map(
aFileEvent -> {
if (aFileEvent.type == FileEvent.EventType.DELETED) {
return new LuceneCommand(aFileEvent, null);
Expand All @@ -157,14 +155,13 @@ public void fileFoundByCrawler(final Configuration.CrawlLocation aLocation, fina
});

// Ok, finally we add everything to the index
theLuceneFlux.publishOn(Schedulers.newSingle("LuceneUpdater")).doOnNext(aCommand -> {
theLuceneFlux.doOnNext(aCommand -> {
if (aCommand.fileEvent.type == FileEvent.EventType.DELETED) {
try {
luceneIndexHandler.removeFromIndex(aCommand.fileEvent.path.toString());

aNotifier.showInformation("Deleted " + aCommand.fileEvent.path.getFileName());

progressListener.newFileFound(aCommand.fileEvent.path.toString());
} catch (Exception e) {
aNotifier.showError("Error removing " + aCommand.fileEvent.path.getFileName(), e);
}
Expand All @@ -175,22 +172,21 @@ public void fileFoundByCrawler(final Configuration.CrawlLocation aLocation, fina

notifier.showInformation("Reindexed " + aCommand.fileEvent.path.getFileName());

progressListener.newFileFound(aCommand.fileEvent.path.toString());

} catch (Exception e) {
aNotifier.showError("Error re-inxeding " + aCommand.fileEvent.path.getFileName(), e);
}
}
}
}).subscribe(new BaseSubscriber<>() {
}).sequential().subscribe(new BaseSubscriber<>() {
@Override
protected void hookOnSubscribe(final Subscription aSubscription) {
request(1);
request(Runtime.getRuntime().availableProcessors());
}

@Override
protected void hookOnNext(final LuceneCommand aCommand) {
log.info("Processed command for {}", aCommand.fileEvent.path);
statistics.jobFinished();
request(1);
}

Expand Down Expand Up @@ -224,12 +220,59 @@ public void configurationUpdated(final Configuration aConfiguration) throws IOEx
});
}

public void setProgressListener(final ProgressListener progressListener) {
this.progressListener = progressListener;
public void setProgressListener(final ProgressListener aProgressListener) {
progressListener = aProgressListener;

progressInfo = new Thread("ProgressInfo") {
@Override
public void run() {

long lastRemaining = -1;
final var format = NumberFormat.getIntegerInstance();
var lastMessage = "";

while (!isInterrupted()) {

final long totalJobs = statistics.totalJobs();
final long completedJobs = statistics.completedJobs();

final long remaining = Math.max(totalJobs - completedJobs, 0);

if (remaining > 0) {
if (lastRemaining == -1) {
lastMessage = remaining + " Files are still in the indexing queue.";
progressListener.infotext(lastMessage);
} else {
final var thruput = lastRemaining - remaining;
if (thruput > 0) {
final double eta = ((double) remaining) / thruput;
lastMessage = remaining + " Files are still in the indexing queue, " + format.format(eta) + " seconds remaining (ETA).";
progressListener.infotext(lastMessage);
} else {
if (lastMessage.length() > 0) {
progressListener.infotext(lastMessage);
}
}
}
} else {
lastMessage = "";
}

lastRemaining = remaining;

try {
Thread.sleep(1000);
} catch (final InterruptedException e) {

}
}
}
};
progressInfo.start();
}

private void add(final Configuration.CrawlLocation aLocation) throws IOException {
locations.put(aLocation, new DirectoryWatcher(watchServiceCache, aLocation, DirectoryWatcher.DEFAULT_WAIT_FOR_ACTION, directoryListener, executorPool).startWatching());
locations.put(aLocation, new DirectoryWatcher(watchServiceCache, aLocation, DirectoryWatcher.DEFAULT_WAIT_FOR_ACTION, directoryListener).startWatching());
}

private void setIndexLocation(final Configuration aConfiguration) throws IOException {
Expand Down Expand Up @@ -260,6 +303,9 @@ public void crawlLocations() {
}

public void shutdown() {
if (progressInfo != null) {
progressInfo.interrupt();
}
luceneIndexHandler.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ private void wakeupThread() {
}
}

@Override public void newFileFound(final String aFilename) {
@Override public void infotext(final String aInfoText) {
wakeupThread();
watcherThread.notifyProgress();
Platform.runLater(() -> statusText.setText(aFilename));
Platform.runLater(() -> statusText.setText(aInfoText));
}

@Override public void crawlingFinished() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,4 @@ interface DirectoryListener {
void fileDeleted(Configuration.CrawlLocation aLocation, Path aFile);

void fileCreatedOrModified(Configuration.CrawlLocation aLocation, Path aFile);

void fileFoundByCrawler(Configuration.CrawlLocation aLocation, Path aFile);
}
42 changes: 26 additions & 16 deletions src/main/java/de/mirkosertic/desktopsearch/DirectoryWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,8 @@
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.nio.file.*;
import java.util.*;

@Slf4j
public class DirectoryWatcher {
Expand Down Expand Up @@ -58,15 +48,14 @@ public boolean runOneCycle() {

private final WatchService watchService;
private final Thread watcherThread;
private final Thread monitorThread;
private final Map<Path, ActionTimer> fileTimers;
private final int waitForAction;
private final Timer actionTimer;
private final DirectoryListener directoryListener;
private final Configuration.CrawlLocation filesystemLocation;
private final ExecutorPool executorPool;

public DirectoryWatcher(final WatchServiceCache aWatchServiceCache, final Configuration.CrawlLocation aFileSystemLocation, final int aWaitForAction, final DirectoryListener aDirectoryListener, final ExecutorPool aExecutorPool) throws IOException {
executorPool = aExecutorPool;
public DirectoryWatcher(final WatchServiceCache aWatchServiceCache, final Configuration.CrawlLocation aFileSystemLocation, final int aWaitForAction, final DirectoryListener aDirectoryListener) throws IOException {
fileTimers = new HashMap<>();
waitForAction = aWaitForAction;
directoryListener = aDirectoryListener;
Expand Down Expand Up @@ -103,6 +92,25 @@ public void run() {
}
}
};
monitorThread = new Thread("Index-Monitor") {
@Override
public void run() {
while (!isInterrupted()) {
try {
synchronized (fileTimers) {
final var size = fileTimers.size();
if (size > 0) {
log.info("Currently {} files in index queue...", size);
}
}
Thread.sleep(1000);
} catch (final InterruptedException e) {
log.debug("Waiting interrupted", e);
}
}
}
};

actionTimer = new Timer();
}

Expand Down Expand Up @@ -180,6 +188,7 @@ public void run() {
theRegisterWatchers.start();

watcherThread.start();
monitorThread.start();
actionTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
Expand All @@ -192,6 +201,7 @@ public void run() {
public void stopWatching() {
actionTimer.cancel();
watcherThread.interrupt();
monitorThread.interrupt();
}

public void crawl() throws IOException {
Expand All @@ -200,7 +210,7 @@ public void crawl() throws IOException {

Files.walk(thePath).forEach(aPath -> {
if (!Files.isDirectory(aPath)) {
executorPool.execute(() -> directoryListener.fileFoundByCrawler(filesystemLocation, aPath));
publishActionFor(aPath, StandardWatchEventKinds.ENTRY_MODIFY);
}
});
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/de/mirkosertic/desktopsearch/NLP.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public static NLP forLanguage(final SupportedLanguage aLanguage) {
props.setProperty("annotators", "tokenize,ssplit,pos,lemma,ner");
props.setProperty("ner.useSUTime", "false");
props.setProperty("ner.applyFineGrained", "false");
props.setProperty("ner.maxSentenceLength", "200");
return new StanfordCoreNLP(props);
}), cachedBlacklist(aLanguage));
case de:
Expand All @@ -123,6 +124,7 @@ public static NLP forLanguage(final SupportedLanguage aLanguage) {
props.setProperty("annotators", "tokenize,ssplit,pos,lemma,ner");
props.setProperty("ner.useSUTime", "false");
props.setProperty("ner.applyFineGrained", "false");
props.setProperty("ner.maxSentenceLength", "200");
return new StanfordCoreNLP(props);
} catch (final IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

interface ProgressListener {

void newFileFound(String aFilename);
void infotext(String aInfoText);

void crawlingFinished();
}
Loading

0 comments on commit 6754ba7

Please sign in to comment.