Skip to content

Commit

Permalink
Use virtual threads (#199)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rakambda authored Jul 13, 2024
1 parent f147abe commit 3ebfda3
Show file tree
Hide file tree
Showing 13 changed files with 314 additions and 418 deletions.
34 changes: 22 additions & 12 deletions src/main/java/fr/rakambda/mediaconverter/ConsoleHandler.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
package fr.rakambda.mediaconverter;

import fr.rakambda.mediaconverter.mediaprocessor.MediaProcessorTask;
import fr.rakambda.mediaconverter.utils.Continue;
import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.NotNull;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Scanner;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;

/**
* Handles commands sent in the standard input.
*/
@Log4j2
class ConsoleHandler extends Thread implements AutoCloseable{
private static final int WAIT_DELAY = 10000;
private final Collection<IProcessor> processors = new LinkedList<>();
private final Continue aContinue;
private final Collection<ExecutorService> executorServices;
private final Collection<MediaProcessorTask> tasks;
private boolean stop;

ConsoleHandler(){
ConsoleHandler(Continue aContinue){
super();
this.aContinue = aContinue;
this.executorServices = new LinkedList<>();
this.tasks = new LinkedList<>();

stop = false;
setDaemon(true);
setName("Console watcher");
Expand Down Expand Up @@ -46,19 +55,16 @@ public void run(){
var command = args.poll();
if("q".equals(command)){
log.info("Exiting");
processors.forEach(IProcessor::resume);
processors.forEach(IProcessor::close);
processors.stream()
.map(IProcessor::getOutputQueue)
.forEach(Collection::clear);
this.executorServices.forEach(ExecutorService::shutdownNow);
this.tasks.forEach(MediaProcessorTask::cancel);
}
else if("p".equals(command)){
log.info("Pausing");
processors.forEach(IProcessor::pause);
aContinue.pause();
}
else if("r".equals(command)){
log.info("Resuming");
processors.forEach(IProcessor::resume);
aContinue.resume();
}
}
catch(Exception e){
Expand All @@ -76,7 +82,11 @@ public void close(){
stop = true;
}

public void add(@NotNull IProcessor processor){
processors.add(processor);
public void registerExecutor(ExecutorService executor){
executorServices.add(executor);
}

public void registerTasks(ConcurrentLinkedDeque<MediaProcessorTask> converters){
tasks.addAll(converters);
}
}
17 changes: 0 additions & 17 deletions src/main/java/fr/rakambda/mediaconverter/IProcessor.java

This file was deleted.

74 changes: 37 additions & 37 deletions src/main/java/fr/rakambda/mediaconverter/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
import fr.rakambda.mediaconverter.file.FileProberFilter;
import fr.rakambda.mediaconverter.file.FileProcessor;
import fr.rakambda.mediaconverter.file.FileScanner;
import fr.rakambda.mediaconverter.mediaprocessor.MediaProcessorTask;
import fr.rakambda.mediaconverter.progress.ConversionProgressExecutor;
import fr.rakambda.mediaconverter.progress.ConverterProgressBarGenerator;
import fr.rakambda.mediaconverter.progress.ProgressBarSupplier;
import fr.rakambda.mediaconverter.progress.ReuseProgressBarSupplier;
import fr.rakambda.mediaconverter.utils.CLIParameters;
import fr.rakambda.mediaconverter.utils.Continue;
import fr.rakambda.mediaconverter.utils.PausableThreadPoolExecutor;
import lombok.extern.log4j.Log4j2;
import me.tongfei.progressbar.ProgressBar;
import me.tongfei.progressbar.ProgressBarBuilder;
Expand All @@ -26,13 +29,13 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Supplier;

@Log4j2
Expand Down Expand Up @@ -65,18 +68,29 @@ public static void main(String[] args){
List<Path> tempPaths;

var progressBarGenerator = new ConverterProgressBarGenerator();
try(var converterExecutor = ConversionProgressExecutor.of(Executors.newFixedThreadPool(parameters.getThreadCount()));
var aContinue = new Continue();
try(var converterExecutor = ConversionProgressExecutor.of(new PausableThreadPoolExecutor(parameters.getThreadCount(), aContinue));
var scanningProgressBar = new ProgressBarBuilder().setTaskName("Scanning").setUnit("File", 1).build();
var converterProgressBarSupplier = new ReuseProgressBarSupplier(progressBarGenerator);
var consoleHandler = new ConsoleHandler()){
var consoleHandler = new ConsoleHandler(aContinue)){
tempPaths = new ArrayList<>(Configuration.loadConfiguration(parameters.getConfiguration())
.stream()
.flatMap(config -> config.getConversions().stream())
.filter(Conversion::isEnabled)
.parallel()
.map(conv -> {
try{
return Main.convert(conv, ffmpegSupplier, ffprobeSupplier, converterExecutor, scanningProgressBar, converterProgressBarSupplier, parameters.isDryRun(), parameters.getFfmpegThreadCount(), parameters.getFfprobeThreadCount(), consoleHandler);
return Main.convert(
conv,
ffmpegSupplier,
ffprobeSupplier,
converterExecutor,
scanningProgressBar,
converterProgressBarSupplier,
parameters.isDryRun(),
parameters.getFfmpegThreadCount(),
consoleHandler
);
}
catch(IOException e){
log.error("Failed to perform conversion", e);
Expand Down Expand Up @@ -107,7 +121,6 @@ private static Path convert(
@NotNull ProgressBarSupplier converterProgressBarSupplier,
boolean dryRun,
@Nullable Integer ffmpegThreads,
@NotNull Integer ffprobeThreadCount,
@NotNull ConsoleHandler consoleHandler
) throws IOException{
var tempDirectory = conversion.createTempDirectory();
Expand All @@ -119,45 +132,32 @@ private static Path convert(
throw new IllegalArgumentException("Output path " + conversion.getOutput().toAbsolutePath() + " doesn't exists");
}

ExecutorService es = null;
try(var storage = conversion.getStorage()){
es = Executors.newCachedThreadPool();

var scannerOutput = new LinkedBlockingQueue<Path>(500);
var fileFilterOutput = new LinkedBlockingQueue<Path>(500);
var proberOutput = new LinkedBlockingQueue<FileProber.ProbeResult>(50);
var proberFilterOutput = new LinkedBlockingQueue<FileProber.ProbeResult>(50);

var processors = new LinkedList<IProcessor>();
var fileScanner = new FileScanner(scanningProgressBar, storage, conversion.getAbsoluteExcluded(), scannerOutput);
var fileProcessor = new FileProcessor(converterExecutor, ffmpegSupplier, tempDirectory, conversion.getInput(), conversion.getOutput(), proberFilterOutput, scanningProgressBar, converterProgressBarSupplier, conversion.isDeleteInput(), ffmpegThreads, dryRun);
var converters = new ConcurrentLinkedDeque<MediaProcessorTask>();
consoleHandler.registerTasks(converters);

try(var storage = conversion.getStorage();
var virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor()){
consoleHandler.registerExecutor(virtualThreadExecutor);

processors.add(fileScanner);
processors.add(new FileFilter(scanningProgressBar, storage, scannerOutput, fileFilterOutput, conversion.getExtensions()));
for(int i = 0; i < ffprobeThreadCount; i++){
processors.add(new FileProber(scanningProgressBar, storage, fileFilterOutput, proberOutput, ffprobeSupplier, conversion.getProcessors()));
}
processors.add(new FileProberFilter(scanningProgressBar, proberOutput, proberFilterOutput, conversion.getFilters()));
processors.add(fileProcessor);
Consumer<MediaProcessorTask> converterRunner = converter -> {
converters.add(converter);
converter.execute(converterExecutor, dryRun);
};
Consumer<FileProber.ProbeResult> fileProcessor = probeResult -> new FileProcessor(probeResult, ffmpegSupplier, tempDirectory, conversion.getInput(), conversion.getOutput(), scanningProgressBar, converterProgressBarSupplier, conversion.isDeleteInput(), ffmpegThreads, converterRunner).run();
Consumer<FileProber.ProbeResult> fileProberFilter = probeResult -> new FileProberFilter(probeResult, scanningProgressBar, conversion.getFilters(), fileProcessor).run();
Consumer<Path> fileProber = file -> new FileProber(file, scanningProgressBar, storage, ffprobeSupplier, conversion.getProcessors(), fileProberFilter).run();
Consumer<Path> fileFilter = file -> virtualThreadExecutor.execute(new FileFilter(file, scanningProgressBar, storage, conversion.getExtensions(), fileProber));

processors.forEach(consoleHandler::add);
var fileScanner = new FileScanner(scanningProgressBar, storage, conversion.getAbsoluteExcluded(), fileFilter);

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
processors.forEach(IProcessor::close);

virtualThreadExecutor.shutdownNow();
converterExecutor.shutdownNow();
fileProcessor.cancel();

converters.forEach(MediaProcessorTask::cancel);
}));

es.submit(fileProcessor);
processors.forEach(es::submit);
Files.walkFileTree(conversion.getInput(), fileScanner);
processors.forEach(IProcessor::close);
}
finally{
if(Objects.nonNull(es)){
es.shutdownNow();
}
}
}
catch(Exception e){
Expand Down
137 changes: 49 additions & 88 deletions src/main/java/fr/rakambda/mediaconverter/file/FileFilter.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package fr.rakambda.mediaconverter.file;

import fr.rakambda.mediaconverter.IProcessor;
import fr.rakambda.mediaconverter.storage.IStorage;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import me.tongfei.progressbar.ProgressBar;
Expand All @@ -11,103 +9,66 @@
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

@Slf4j
public class FileFilter implements Runnable, AutoCloseable, IProcessor{
private final ProgressBar progressBar;
private final IStorage storage;
private final BlockingQueue<Path> inputQueue;
@Getter
private final BlockingQueue<Path> outputQueue;
private final Collection<String> extensionsToScan;
private final CountDownLatch countDownLatch;
private boolean shutdown;
public class FileFilter implements Runnable{
private final Path file;
private final ProgressBar progressBar;
private final IStorage storage;
private final Collection<String> extensionsToScan;
private final Consumer<Path> callback;

public FileFilter(@NonNull ProgressBar progressBar,
public FileFilter(@NonNull Path file,
@NonNull ProgressBar progressBar,
@NonNull IStorage storage,
@NonNull BlockingQueue<Path> inputQueue,
@NonNull BlockingQueue<Path> outputQueue,
@NonNull Collection<String> extensionsToScan){
this.progressBar = progressBar;
this.storage = storage;
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
@NonNull Collection<String> extensionsToScan,
@NonNull Consumer<Path> callback
){
this.file = file;
this.progressBar = progressBar;
this.storage = storage;
this.extensionsToScan = extensionsToScan;

shutdown = false;
countDownLatch = new CountDownLatch(1);
}

@Override
public void run() {
try {
do {
var file = inputQueue.poll(5, TimeUnit.SECONDS);
if (Objects.nonNull(file)) {
if (processFile(file)) {
outputQueue.put(file);
} else {
progressBar.step();
}
}
}
while (!shutdown || !inputQueue.isEmpty());
} catch (InterruptedException e) {
log.error("Error waiting for element", e);
} finally {
countDownLatch.countDown();
}
}

private boolean processFile(@NonNull Path file) {
try {
if (storage.isUseless(file)) {
return false;
}

if (isNotMedia(file) || Files.isHidden(file)) {
storage.setUseless(file);
return false;
}
return true;
} catch (SQLException | IOException e) {
log.error("Failed to filter file {}", file, e);
return false;
}
}

private boolean isNotMedia(@NonNull Path file) {
var filename = file.getFileName().toString();
var dotIndex = filename.lastIndexOf('.');

if (dotIndex <= 0) {
return true;
}

var extension = filename.substring(dotIndex + 1).toLowerCase();
return !extensionsToScan.contains(extension);
}

@Override
public void resume(){
this.callback = callback;
}

@Override
public void pause(){
public void run(){
if(processFile(file)){
callback.accept(file);
}
else{
progressBar.step();
}
}

@Override
public void close(){
shutdown = true;
private boolean processFile(@NonNull Path file){
try{
countDownLatch.await();
if(storage.isUseless(file)){
return false;
}

if(isNotMedia(file) || Files.isHidden(file)){
storage.setUseless(file);
return false;
}
return true;
}
catch(InterruptedException e){
log.info("Failed to wait for latch", e);
catch(SQLException | IOException e){
log.error("Failed to filter file {}", file, e);
return false;
}
}
}

private boolean isNotMedia(@NonNull Path file){
var filename = file.getFileName().toString();
var dotIndex = filename.lastIndexOf('.');

if(dotIndex <= 0){
return true;
}

var extension = filename.substring(dotIndex + 1).toLowerCase();
return !extensionsToScan.contains(extension);
}
}
Loading

0 comments on commit 3ebfda3

Please sign in to comment.