diff --git a/.flake8 b/.flake8 index cc992968d..43a1b7693 100644 --- a/.flake8 +++ b/.flake8 @@ -1,2 +1,3 @@ [flake8] -select = E3, E4, F \ No newline at end of file +select = E3, E4, F +per-file-ignores = roop/core.py:E402 \ No newline at end of file diff --git a/.github/examples/face.jpg b/.github/examples/source.jpg similarity index 100% rename from .github/examples/face.jpg rename to .github/examples/source.jpg diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ef5fa6c95..0b8f4ce92 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,9 @@ jobs: with: python-version: 3.9 - run: pip install flake8 - - run: flake8 run.py core + - run: pip install mypy + - run: flake8 run.py roop + - run: mypy --config-file mypi.ini run.py roop test: runs-on: ubuntu-latest steps: @@ -25,8 +27,7 @@ jobs: uses: actions/setup-python@v2 with: python-version: 3.9 - - run: pip install -r requirements.txt gdown - - run: gdown 13QpWFWJ37EB-nHrEOY64CEtQWY-tz7DZ - - run: ./run.py -f=.github/examples/face.jpg -t=.github/examples/target.mp4 -o=.github/examples/output.mp4 - - run: ffmpeg -i .github/examples/snapshot.mp4 -i .github/examples/output.mp4 -filter_complex "psnr" -f null - + - run: pip install -r requirements-ci.txt + - run: python run.py -s=.github/examples/source.jpg -t=.github/examples/target.mp4 -o=.github/examples/output.mp4 + - run: ffmpeg -i .github/examples/snapshot.mp4 -i .github/examples/output.mp4 -filter_complex psnr -f null - diff --git a/.gitignore b/.gitignore index 09916c438..e25e7ce3b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .idea +models +temp __pycache__ -*.onnx \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 000000000..7fb9cb146 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,21 @@ +## Pull Requests + +### Do + +- ...consider to fix bugs over adding features +- ...one pull request for one feature or improvement +- ...consult us about implementation details +- ...proper testing before you submit your code +- ...resolve failed CI pipelines + +### Don't + +- ...introduce fundamental changes in terms of software architecture +- ...introduce OOP - we accept functional programming only +- ...ignore given requirements or try to work around them +- ...submit code to a development branch without consulting us +- ...submit massive amount of code changes +- ...submit a proof of concept +- ...submit code that is using undocumented and private APIs +- ...solve third party issues in our project +- ...comment what your code does - use proper naming instead diff --git a/README.md b/README.md index c47fcb018..136a150e2 100644 --- a/README.md +++ b/README.md @@ -36,26 +36,32 @@ Additional command line arguments are given below: ``` options: -h, --help show this help message and exit - -f SOURCE_IMG, --face SOURCE_IMG - use this face + -s SOURCE_PATH, --source SOURCE_PATH + select an source image -t TARGET_PATH, --target TARGET_PATH - replace this face - -o OUTPUT_FILE, --output OUTPUT_FILE - save output to this file - --keep-fps maintain original fps - --keep-frames keep frames directory - --all-faces swap all faces in frame + select an target image or video + -o OUTPUT_PATH, --output OUTPUT_PATH + select output file or directory + --frame-processor {face_swapper,face_enhancer} [{face_swapper,face_enhancer} ...] + pipeline of frame processors + --keep-fps keep original fps + --keep-audio keep original audio + --keep-frames keep temporary frames + --many-faces process every face + --video-encoder {libx264,libx265,libvpx-vp9} + adjust output video encoder + --video-quality VIDEO_QUALITY + adjust output video quality --max-memory MAX_MEMORY - maximum amount of RAM in GB to be used - --cpu-cores CPU_CORES - number of CPU cores to use - --gpu-threads GPU_THREADS - number of threads to be use for the GPU - --gpu-vendor {apple,amd,intel,nvidia} - choice your GPU vendor + maximum amount of RAM in GB + --execution-provider {cpu,...} [{cpu,...} ...] + execution provider + --execution-threads EXECUTION_THREADS + number of execution threads + -v, --version show program's version number and exit ``` -Looking for a CLI mode? Using the -f/--face argument will make the program in cli mode. +Looking for a CLI mode? Using the -s/--source argument will make the program in cli mode. ## Credits - [henryruhs](https://github.com/henryruhs): for being an irreplaceable contributor to the project diff --git a/gui-demo.png b/gui-demo.png index 128eaf625..1972b59e2 100644 Binary files a/gui-demo.png and b/gui-demo.png differ diff --git a/mypi.ini b/mypi.ini new file mode 100644 index 000000000..64218bc23 --- /dev/null +++ b/mypi.ini @@ -0,0 +1,7 @@ +[mypy] +check_untyped_defs = True +disallow_any_generics = True +disallow_untyped_calls = True +disallow_untyped_defs = True +ignore_missing_imports = True +strict_optional = False diff --git a/requirements-ci.txt b/requirements-ci.txt new file mode 100644 index 000000000..cf8aff4df --- /dev/null +++ b/requirements-ci.txt @@ -0,0 +1,14 @@ +numpy==1.23.5 +opencv-python==4.7.0.72 +onnx==1.14.0 +insightface==0.7.3 +psutil==5.9.5 +tk==0.1.0 +customtkinter==5.1.3 +torch==2.0.1 +torchvision==0.15.2 +onnxruntime==1.15.0 +tensorflow==2.12.0 +opennsfw2==0.10.2 +protobuf==4.23.2 +tqdm==4.65.0 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index e25726fd4..859654edc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,9 +6,12 @@ onnx==1.14.0 insightface==0.7.3 psutil==5.9.5 tk==0.1.0 +customtkinter==5.1.3 pillow==9.5.0 torch==2.0.1+cu118; sys_platform != 'darwin' torch==2.0.1; sys_platform == 'darwin' +torchvision==0.15.2+cu118; sys_platform != 'darwin' +torchvision==0.15.2; sys_platform == 'darwin' onnxruntime==1.15.0; sys_platform == 'darwin' and platform_machine != 'arm64' onnxruntime-silicon==1.13.1; sys_platform == 'darwin' and platform_machine == 'arm64' onnxruntime-gpu==1.15.0; sys_platform != 'darwin' @@ -16,4 +19,5 @@ tensorflow==2.13.0rc1; sys_platform == 'darwin' tensorflow==2.12.0; sys_platform != 'darwin' opennsfw2==0.10.2 protobuf==4.23.2 -tqdm==4.65.0 \ No newline at end of file +tqdm==4.65.0 +gfpgan==1.3.8 \ No newline at end of file diff --git a/roop/__init__.py b/roop/__init__.py index 8d1c8b69c..e69de29bb 100644 --- a/roop/__init__.py +++ b/roop/__init__.py @@ -1 +0,0 @@ - diff --git a/roop/analyser.py b/roop/analyser.py deleted file mode 100644 index 804f7a8d9..000000000 --- a/roop/analyser.py +++ /dev/null @@ -1,27 +0,0 @@ -import insightface -import roop.globals - -FACE_ANALYSER = None - - -def get_face_analyser(): - global FACE_ANALYSER - if FACE_ANALYSER is None: - FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=roop.globals.providers) - FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640)) - return FACE_ANALYSER - - -def get_face_single(img_data): - face = get_face_analyser().get(img_data) - try: - return sorted(face, key=lambda x: x.bbox[0])[0] - except IndexError: - return None - - -def get_face_many(img_data): - try: - return get_face_analyser().get(img_data) - except IndexError: - return None diff --git a/roop/capturer.py b/roop/capturer.py new file mode 100644 index 000000000..fd49d468d --- /dev/null +++ b/roop/capturer.py @@ -0,0 +1,20 @@ +from typing import Any +import cv2 + + +def get_video_frame(video_path: str, frame_number: int = 0) -> Any: + capture = cv2.VideoCapture(video_path) + frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT) + capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1)) + has_frame, frame = capture.read() + capture.release() + if has_frame: + return frame + return None + + +def get_video_frame_total(video_path: str) -> int: + capture = cv2.VideoCapture(video_path) + video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) + capture.release() + return video_frame_total diff --git a/roop/core.py b/roop/core.py index f09674b86..050d1377e 100755 --- a/roop/core.py +++ b/roop/core.py @@ -2,78 +2,136 @@ import os import sys -# single thread doubles performance of gpu-mode - needs to be set before torch import -if any(arg.startswith('--gpu-vendor') for arg in sys.argv): +# single thread doubles cuda performance - needs to be set before torch import +if any(arg.startswith('--execution-provider') for arg in sys.argv): os.environ['OMP_NUM_THREADS'] = '1' +# reduce tensorflow log level +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' +import warnings +from typing import List import platform import signal import shutil -import glob import argparse -import psutil import torch +import onnxruntime import tensorflow -from pathlib import Path -import multiprocessing as mp -from opennsfw2 import predict_video_frames, predict_image -import cv2 import roop.globals -from roop.swapper import process_video, process_img, process_faces, process_frames -from roop.utils import is_img, detect_fps, set_fps, create_video, add_audio, extract_frames, rreplace -from roop.analyser import get_face_single +import roop.metadata import roop.ui as ui - -signal.signal(signal.SIGINT, lambda signal_number, frame: quit()) -parser = argparse.ArgumentParser() -parser.add_argument('-f', '--face', help='use this face', dest='source_img') -parser.add_argument('-t', '--target', help='replace this face', dest='target_path') -parser.add_argument('-o', '--output', help='save output to this file', dest='output_file') -parser.add_argument('--keep-fps', help='maintain original fps', dest='keep_fps', action='store_true', default=False) -parser.add_argument('--keep-frames', help='keep frames directory', dest='keep_frames', action='store_true', default=False) -parser.add_argument('--all-faces', help='swap all faces in frame', dest='all_faces', action='store_true', default=False) -parser.add_argument('--max-memory', help='maximum amount of RAM in GB to be used', dest='max_memory', type=int) -parser.add_argument('--cpu-cores', help='number of CPU cores to use', dest='cpu_cores', type=int, default=max(psutil.cpu_count() / 2, 1)) -parser.add_argument('--gpu-threads', help='number of threads to be use for the GPU', dest='gpu_threads', type=int, default=8) -parser.add_argument('--gpu-vendor', help='choice your GPU vendor', dest='gpu_vendor', choices=['apple', 'amd', 'intel', 'nvidia']) - -args = parser.parse_known_args()[0] - -if 'all_faces' in args: - roop.globals.all_faces = True - -if args.cpu_cores: - roop.globals.cpu_cores = int(args.cpu_cores) - -# cpu thread fix for mac -if sys.platform == 'darwin': - roop.globals.cpu_cores = 1 - -if args.gpu_threads: - roop.globals.gpu_threads = int(args.gpu_threads) - -# gpu thread fix for amd -if args.gpu_vendor == 'amd': - roop.globals.gpu_threads = 1 - -if args.gpu_vendor: - roop.globals.gpu_vendor = args.gpu_vendor -else: - roop.globals.providers = ['CPUExecutionProvider'] - -sep = "/" -if os.name == "nt": - sep = "\\" - - -def limit_resources(): +from roop.predicter import predict_image, predict_video +from roop.processors.frame.core import get_frame_processors_modules +from roop.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path + +if 'ROCMExecutionProvider' in roop.globals.execution_providers: + del torch + +warnings.filterwarnings('ignore', category=FutureWarning, module='insightface') +warnings.filterwarnings('ignore', category=UserWarning, module='torchvision') + + +def parse_args() -> None: + signal.signal(signal.SIGINT, lambda signal_number, frame: destroy()) + program = argparse.ArgumentParser() + program.add_argument('-s', '--source', help='select an source image', dest='source_path') + program.add_argument('-t', '--target', help='select an target image or video', dest='target_path') + program.add_argument('-o', '--output', help='select output file or directory', dest='output_path') + program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+') + program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False) + program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True) + program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False) + program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False) + program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9']) + program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18) + program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory()) + program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+') + program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads()) + program.add_argument('-v', '--version', action='version', version=f'{roop.metadata.name} {roop.metadata.version}') + + # register deprecated args + program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated') + program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int) + program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated') + program.add_argument('--gpu-threads', help=argparse.SUPPRESS, dest='gpu_threads_deprecated', type=int) + + args = program.parse_args() + + roop.globals.source_path = args.source_path + roop.globals.target_path = args.target_path + roop.globals.output_path = normalize_output_path(roop.globals.source_path, roop.globals.target_path, args.output_path) + roop.globals.frame_processors = args.frame_processor + roop.globals.headless = args.source_path or args.target_path or args.output_path + roop.globals.keep_fps = args.keep_fps + roop.globals.keep_audio = args.keep_audio + roop.globals.keep_frames = args.keep_frames + roop.globals.many_faces = args.many_faces + roop.globals.video_encoder = args.video_encoder + roop.globals.video_quality = args.video_quality + roop.globals.max_memory = args.max_memory + roop.globals.execution_providers = decode_execution_providers(args.execution_provider) + roop.globals.execution_threads = args.execution_threads + + # translate deprecated args + if args.source_path_deprecated: + print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m') + roop.globals.source_path = args.source_path_deprecated + roop.globals.output_path = normalize_output_path(args.source_path_deprecated, roop.globals.target_path, args.output_path) + if args.cpu_cores_deprecated: + print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m') + roop.globals.execution_threads = args.cpu_cores_deprecated + if args.gpu_vendor_deprecated == 'apple': + print('\033[33mArgument --gpu-vendor apple is deprecated. Use --execution-provider coreml instead.\033[0m') + roop.globals.execution_providers = decode_execution_providers(['coreml']) + if args.gpu_vendor_deprecated == 'nvidia': + print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m') + roop.globals.execution_providers = decode_execution_providers(['cuda']) + if args.gpu_vendor_deprecated == 'amd': + print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider cuda instead.\033[0m') + roop.globals.execution_providers = decode_execution_providers(['rocm']) + if args.gpu_threads_deprecated: + print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m') + roop.globals.execution_threads = args.gpu_threads_deprecated + + +def encode_execution_providers(execution_providers: List[str]) -> List[str]: + return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers] + + +def decode_execution_providers(execution_providers: List[str]) -> List[str]: + return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers())) + if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)] + + +def suggest_max_memory() -> int: + if platform.system().lower() == 'darwin': + return 4 + return 16 + + +def suggest_execution_providers() -> List[str]: + return encode_execution_providers(onnxruntime.get_available_providers()) + + +def suggest_execution_threads() -> int: + if 'DmlExecutionProvider' in roop.globals.execution_providers: + return 1 + if 'ROCMExecutionProvider' in roop.globals.execution_providers: + return 2 + return 8 + + +def limit_resources() -> None: # prevent tensorflow memory leak gpus = tensorflow.config.experimental.list_physical_devices('GPU') for gpu in gpus: tensorflow.config.experimental.set_memory_growth(gpu, True) - if args.max_memory: - memory = args.max_memory * 1024 * 1024 * 1024 - if str(platform.system()).lower() == 'windows': + # limit memory usage + if roop.globals.max_memory: + memory = roop.globals.max_memory * 1024 ** 3 + if platform.system().lower() == 'darwin': + memory = roop.globals.max_memory * 1024 ** 6 + if platform.system().lower() == 'windows': import ctypes kernel32 = ctypes.windll.kernel32 kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory)) @@ -82,199 +140,99 @@ def limit_resources(): resource.setrlimit(resource.RLIMIT_DATA, (memory, memory)) -def pre_check(): +def release_resources() -> None: + if 'CUDAExecutionProvider' in roop.globals.execution_providers: + torch.cuda.empty_cache() + + +def pre_check() -> bool: if sys.version_info < (3, 9): - quit('Python version is not supported - please upgrade to 3.9 or higher') + update_status('Python version is not supported - please upgrade to 3.9 or higher.') + return False if not shutil.which('ffmpeg'): - quit('ffmpeg is not installed!') - model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx') - if not os.path.isfile(model_path): - quit('File "inswapper_128.onnx" does not exist!') - if roop.globals.gpu_vendor == 'apple': - if 'CoreMLExecutionProvider' not in roop.globals.providers: - quit("You are using --gpu=apple flag but CoreML isn't available or properly installed on your system.") - if roop.globals.gpu_vendor == 'amd': - if 'ROCMExecutionProvider' not in roop.globals.providers: - quit("You are using --gpu=amd flag but ROCM isn't available or properly installed on your system.") - if roop.globals.gpu_vendor == 'nvidia': - CUDA_VERSION = torch.version.cuda - CUDNN_VERSION = torch.backends.cudnn.version() - if not torch.cuda.is_available(): - quit("You are using --gpu=nvidia flag but CUDA isn't available or properly installed on your system.") - if CUDA_VERSION > '11.8': - quit(f"CUDA version {CUDA_VERSION} is not supported - please downgrade to 11.8") - if CUDA_VERSION < '11.4': - quit(f"CUDA version {CUDA_VERSION} is not supported - please upgrade to 11.8") - if CUDNN_VERSION < 8220: - quit(f"CUDNN version {CUDNN_VERSION} is not supported - please upgrade to 8.9.1") - if CUDNN_VERSION > 8910: - quit(f"CUDNN version {CUDNN_VERSION} is not supported - please downgrade to 8.9.1") - - -def get_video_frame(video_path, frame_number = 1): - cap = cv2.VideoCapture(video_path) - amount_of_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) - cap.set(cv2.CAP_PROP_POS_FRAMES, min(amount_of_frames, frame_number-1)) - if not cap.isOpened(): - print("Error opening video file") + update_status('ffmpeg is not installed.') + return False + return True + + +def update_status(message: str, scope: str = 'ROOP.CORE') -> None: + print(f'[{scope}] {message}') + if not roop.globals.headless: + ui.update_status(message) + + +def start() -> None: + for frame_processor in get_frame_processors_modules(roop.globals.frame_processors): + if not frame_processor.pre_start(): + return + # process image to image + if has_image_extension(roop.globals.target_path): + if predict_image(roop.globals.target_path): + destroy() + shutil.copy2(roop.globals.target_path, roop.globals.output_path) + for frame_processor in get_frame_processors_modules(roop.globals.frame_processors): + update_status('Progressing...', frame_processor.NAME) + frame_processor.process_image(roop.globals.source_path, roop.globals.output_path, roop.globals.output_path) + release_resources() + if is_image(roop.globals.target_path): + update_status('Processing to image succeed!') + else: + update_status('Processing to image failed!') return - ret, frame = cap.read() - if ret: - return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - - cap.release() - - -def preview_video(video_path): - cap = cv2.VideoCapture(video_path) - if not cap.isOpened(): - print("Error opening video file") - return 0 - amount_of_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) - ret, frame = cap.read() - if ret: - frame = get_video_frame(video_path) - - cap.release() - return (amount_of_frames, frame) - - -def status(string): - value = "Status: " + string - if 'cli_mode' in args: - print(value) + # process image to videos + if predict_video(roop.globals.target_path): + destroy() + update_status('Creating temp resources...') + create_temp(roop.globals.target_path) + update_status('Extracting frames...') + extract_frames(roop.globals.target_path) + temp_frame_paths = get_temp_frame_paths(roop.globals.target_path) + for frame_processor in get_frame_processors_modules(roop.globals.frame_processors): + update_status('Progressing...', frame_processor.NAME) + frame_processor.process_video(roop.globals.source_path, temp_frame_paths) + release_resources() + # handles fps + if roop.globals.keep_fps: + update_status('Detecting fps...') + fps = detect_fps(roop.globals.target_path) + update_status(f'Creating video with {fps} fps...') + create_video(roop.globals.target_path, fps) else: - ui.update_status_label(value) - - -def process_video_multi_cores(source_img, frame_paths): - n = len(frame_paths) // roop.globals.cpu_cores - if n > 2: - processes = [] - for i in range(0, len(frame_paths), n): - p = POOL.apply_async(process_video, args=(source_img, frame_paths[i:i + n],)) - processes.append(p) - for p in processes: - p.get() - POOL.close() - POOL.join() - - -def start(preview_callback = None): - if not args.source_img or not os.path.isfile(args.source_img): - print("\n[WARNING] Please select an image containing a face.") - return - elif not args.target_path or not os.path.isfile(args.target_path): - print("\n[WARNING] Please select a video/image to swap face in.") - return - if not args.output_file: - target_path = args.target_path - args.output_file = rreplace(target_path, "/", "/swapped-", 1) if "/" in target_path else "swapped-" + target_path - target_path = args.target_path - test_face = get_face_single(cv2.imread(args.source_img)) - if not test_face: - print("\n[WARNING] No face detected in source image. Please try with another one.\n") - return - if is_img(target_path): - if predict_image(target_path) > 0.85: - quit() - process_img(args.source_img, target_path, args.output_file) - status("swap successful!") - return - seconds, probabilities = predict_video_frames(video_path=args.target_path, frame_interval=100) - if any(probability > 0.85 for probability in probabilities): - quit() - video_name_full = target_path.split("/")[-1] - video_name = os.path.splitext(video_name_full)[0] - output_dir = os.path.dirname(target_path) + "/" + video_name if os.path.dirname(target_path) else video_name - Path(output_dir).mkdir(exist_ok=True) - status("detecting video's FPS...") - fps, exact_fps = detect_fps(target_path) - if not args.keep_fps and fps > 30: - this_path = output_dir + "/" + video_name + ".mp4" - set_fps(target_path, this_path, 30) - target_path, exact_fps = this_path, 30 + update_status('Creating video with 30.0 fps...') + create_video(roop.globals.target_path) + # handle audio + if roop.globals.keep_audio: + if roop.globals.keep_fps: + update_status('Restoring audio...') + else: + update_status('Restoring audio might cause issues as fps are not kept...') + restore_audio(roop.globals.target_path, roop.globals.output_path) else: - shutil.copy(target_path, output_dir) - status("extracting frames...") - extract_frames(target_path, output_dir) - args.frame_paths = tuple(sorted( - glob.glob(output_dir + "/*.png"), - key=lambda x: int(x.split(sep)[-1].replace(".png", "")) - )) - status("swapping in progress...") - if roop.globals.gpu_vendor is None and roop.globals.cpu_cores > 1: - global POOL - POOL = mp.Pool(roop.globals.cpu_cores) - process_video_multi_cores(args.source_img, args.frame_paths) + move_temp(roop.globals.target_path, roop.globals.output_path) + # clean and validate + clean_temp(roop.globals.target_path) + if is_video(roop.globals.target_path): + update_status('Processing to video succeed!') else: - process_video(args.source_img, args.frame_paths) - status("creating video...") - create_video(video_name, exact_fps, output_dir) - status("adding audio...") - add_audio(output_dir, target_path, video_name_full, args.keep_frames, args.output_file) - save_path = args.output_file if args.output_file else output_dir + "/" + video_name + ".mp4" - print("\n\nVideo saved as:", save_path, "\n\n") - status("swap successful!") - - -def select_face_handler(path: str): - args.source_img = path - - -def select_target_handler(path: str): - args.target_path = path - return preview_video(args.target_path) + update_status('Processing to video failed!') -def toggle_all_faces_handler(value: int): - roop.globals.all_faces = True if value == 1 else False +def destroy() -> None: + if roop.globals.target_path: + clean_temp(roop.globals.target_path) + quit() -def toggle_fps_limit_handler(value: int): - args.keep_fps = int(value != 1) - - -def toggle_keep_frames_handler(value: int): - args.keep_frames = value - - -def save_file_handler(path: str): - args.output_file = path - - -def create_test_preview(frame_number): - return process_faces( - get_face_single(cv2.imread(args.source_img)), - get_video_frame(args.target_path, frame_number) - ) - - -def run(): - global all_faces, keep_frames, limit_fps - - pre_check() +def run() -> None: + parse_args() + if not pre_check(): + return + for frame_processor in get_frame_processors_modules(roop.globals.frame_processors): + if not frame_processor.pre_check(): + return limit_resources() - if args.source_img: - args.cli_mode = True + if roop.globals.headless: start() - quit() - - window = ui.init( - { - 'all_faces': roop.globals.all_faces, - 'keep_fps': args.keep_fps, - 'keep_frames': args.keep_frames - }, - select_face_handler, - select_target_handler, - toggle_all_faces_handler, - toggle_fps_limit_handler, - toggle_keep_frames_handler, - save_file_handler, - start, - get_video_frame, - create_test_preview - ) - - window.mainloop() + else: + window = ui.init(start, destroy) + window.mainloop() diff --git a/roop/face_analyser.py b/roop/face_analyser.py new file mode 100644 index 000000000..ba7803ecf --- /dev/null +++ b/roop/face_analyser.py @@ -0,0 +1,31 @@ +from typing import Any +import insightface + +import roop.globals +from roop.typing import Frame + +FACE_ANALYSER = None + + +def get_face_analyser() -> Any: + global FACE_ANALYSER + + if FACE_ANALYSER is None: + FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=roop.globals.execution_providers) + FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640)) + return FACE_ANALYSER + + +def get_one_face(frame: Frame) -> Any: + face = get_face_analyser().get(frame) + try: + return min(face, key=lambda x: x.bbox[0]) + except ValueError: + return None + + +def get_many_faces(frame: Frame) -> Any: + try: + return get_face_analyser().get(frame) + except IndexError: + return None diff --git a/roop/globals.py b/roop/globals.py index 986bf919c..77fd391db 100644 --- a/roop/globals.py +++ b/roop/globals.py @@ -1,11 +1,17 @@ -import onnxruntime +from typing import List -all_faces = None +source_path = None +target_path = None +output_path = None +frame_processors: List[str] = [] +keep_fps = None +keep_audio = None +keep_frames = None +many_faces = None +video_encoder = None +video_quality = None +max_memory = None +execution_providers: List[str] = [] +execution_threads = None +headless = None log_level = 'error' -cpu_cores = None -gpu_threads = None -gpu_vendor = None -providers = onnxruntime.get_available_providers() - -if 'TensorrtExecutionProvider' in providers: - providers.remove('TensorrtExecutionProvider') diff --git a/roop/metadata.py b/roop/metadata.py new file mode 100644 index 000000000..0519049a7 --- /dev/null +++ b/roop/metadata.py @@ -0,0 +1,2 @@ +name = 'roop' +version = '1.0.0' diff --git a/roop/predicter.py b/roop/predicter.py new file mode 100644 index 000000000..7ebc2b62e --- /dev/null +++ b/roop/predicter.py @@ -0,0 +1,25 @@ +import numpy +import opennsfw2 +from PIL import Image + +from roop.typing import Frame + +MAX_PROBABILITY = 0.85 + + +def predict_frame(target_frame: Frame) -> bool: + image = Image.fromarray(target_frame) + image = opennsfw2.preprocess_image(image, opennsfw2.Preprocessing.YAHOO) + model = opennsfw2.make_open_nsfw_model() + views = numpy.expand_dims(image, axis=0) + _, probability = model.predict(views)[0] + return probability > MAX_PROBABILITY + + +def predict_image(target_path: str) -> bool: + return opennsfw2.predict_image(target_path) > MAX_PROBABILITY + + +def predict_video(target_path: str) -> bool: + _, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100) + return any(probability > MAX_PROBABILITY for probability in probabilities) diff --git a/roop/processors/__init__.py b/roop/processors/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/roop/processors/frame/__init__.py b/roop/processors/frame/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/roop/processors/frame/core.py b/roop/processors/frame/core.py new file mode 100644 index 000000000..a07a9a6ba --- /dev/null +++ b/roop/processors/frame/core.py @@ -0,0 +1,56 @@ +import sys +import importlib +from concurrent.futures import ThreadPoolExecutor +from types import ModuleType +from typing import Any, List, Callable +from tqdm import tqdm + +import roop + +FRAME_PROCESSORS_MODULES: List[ModuleType] = [] +FRAME_PROCESSORS_INTERFACE = [ + 'pre_check', + 'pre_start', + 'process_frame', + 'process_image', + 'process_video' +] + + +def load_frame_processor_module(frame_processor: str) -> Any: + try: + frame_processor_module = importlib.import_module(f'roop.processors.frame.{frame_processor}') + for method_name in FRAME_PROCESSORS_INTERFACE: + if not hasattr(frame_processor_module, method_name): + sys.exit() + except ImportError: + sys.exit() + return frame_processor_module + + +def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]: + global FRAME_PROCESSORS_MODULES + + if not FRAME_PROCESSORS_MODULES: + for frame_processor in frame_processors: + frame_processor_module = load_frame_processor_module(frame_processor) + FRAME_PROCESSORS_MODULES.append(frame_processor_module) + return FRAME_PROCESSORS_MODULES + + +def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None: + with ThreadPoolExecutor(max_workers=roop.globals.execution_threads) as executor: + futures = [] + for path in temp_frame_paths: + future = executor.submit(process_frames, source_path, [path], progress) + futures.append(future) + for future in futures: + future.result() + + +def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None: + progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' + total = len(frame_paths) + with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: + progress.set_postfix({'execution_providers': roop.globals.execution_providers, 'threads': roop.globals.execution_threads, 'memory': roop.globals.max_memory}) + multi_process_frame(source_path, frame_paths, process_frames, progress) diff --git a/roop/processors/frame/face_enhancer.py b/roop/processors/frame/face_enhancer.py new file mode 100644 index 000000000..50c7c5468 --- /dev/null +++ b/roop/processors/frame/face_enhancer.py @@ -0,0 +1,75 @@ +from typing import Any, List +import cv2 +import threading +import gfpgan + +import roop.globals +import roop.processors.frame.core +from roop.core import update_status +from roop.face_analyser import get_one_face +from roop.typing import Frame, Face +from roop.utilities import conditional_download, resolve_relative_path, is_image, is_video + +FACE_ENHANCER = None +THREAD_SEMAPHORE = threading.Semaphore() +THREAD_LOCK = threading.Lock() +NAME = 'ROOP.FACE-ENHANCER' + + +def pre_check() -> bool: + download_directory_path = resolve_relative_path('../models') + conditional_download(download_directory_path, ['https://huggingface.co/henryruhs/roop/resolve/main/GFPGANv1.4.pth']) + return True + + +def pre_start() -> bool: + if not is_image(roop.globals.target_path) and not is_video(roop.globals.target_path): + update_status('Select an image or video for target path.', NAME) + return False + return True + + +def get_face_enhancer() -> Any: + global FACE_ENHANCER + + with THREAD_LOCK: + if FACE_ENHANCER is None: + model_path = resolve_relative_path('../models/GFPGANv1.4.pth') + # todo: set models path https://github.com/TencentARC/GFPGAN/issues/399 + FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined] + return FACE_ENHANCER + + +def enhance_face(temp_frame: Frame) -> Frame: + with THREAD_SEMAPHORE: + _, _, temp_frame = get_face_enhancer().enhance( + temp_frame, + paste_back=True + ) + return temp_frame + + +def process_frame(source_face: Face, temp_frame: Frame) -> Frame: + target_face = get_one_face(temp_frame) + if target_face: + temp_frame = enhance_face(temp_frame) + return temp_frame + + +def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None: + for temp_frame_path in temp_frame_paths: + temp_frame = cv2.imread(temp_frame_path) + result = process_frame(None, temp_frame) + cv2.imwrite(temp_frame_path, result) + if progress: + progress.update(1) + + +def process_image(source_path: str, target_path: str, output_path: str) -> None: + target_frame = cv2.imread(target_path) + result = process_frame(None, target_frame) + cv2.imwrite(output_path, result) + + +def process_video(source_path: str, temp_frame_paths: List[str]) -> None: + roop.processors.frame.core.process_video(None, temp_frame_paths, process_frames) diff --git a/roop/processors/frame/face_swapper.py b/roop/processors/frame/face_swapper.py new file mode 100644 index 000000000..35063d2f9 --- /dev/null +++ b/roop/processors/frame/face_swapper.py @@ -0,0 +1,86 @@ +from typing import Any, List +import cv2 +import insightface +import threading + +import roop.globals +import roop.processors.frame.core +from roop.core import update_status +from roop.face_analyser import get_one_face, get_many_faces +from roop.typing import Face, Frame +from roop.utilities import conditional_download, resolve_relative_path, is_image, is_video + +FACE_SWAPPER = None +THREAD_LOCK = threading.Lock() +NAME = 'ROOP.FACE-SWAPPER' + + +def pre_check() -> bool: + download_directory_path = resolve_relative_path('../models') + conditional_download(download_directory_path, ['https://huggingface.co/henryruhs/roop/resolve/main/inswapper_128.onnx']) + return True + + +def pre_start() -> bool: + if not is_image(roop.globals.source_path): + update_status('Select an image for source path.', NAME) + return False + elif not get_one_face(cv2.imread(roop.globals.source_path)): + update_status('No face in source path detected.', NAME) + return False + if not is_image(roop.globals.target_path) and not is_video(roop.globals.target_path): + update_status('Select an image or video for target path.', NAME) + return False + return True + + +def get_face_swapper() -> Any: + global FACE_SWAPPER + + with THREAD_LOCK: + if FACE_SWAPPER is None: + model_path = resolve_relative_path('../models/inswapper_128.onnx') + FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=roop.globals.execution_providers) + return FACE_SWAPPER + + +def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: + return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True) + + +def process_frame(source_face: Face, temp_frame: Frame) -> Frame: + if roop.globals.many_faces: + many_faces = get_many_faces(temp_frame) + if many_faces: + for target_face in many_faces: + temp_frame = swap_face(source_face, target_face, temp_frame) + else: + target_face = get_one_face(temp_frame) + if target_face: + temp_frame = swap_face(source_face, target_face, temp_frame) + return temp_frame + + +def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None: + source_face = get_one_face(cv2.imread(source_path)) + for temp_frame_path in temp_frame_paths: + temp_frame = cv2.imread(temp_frame_path) + try: + result = process_frame(source_face, temp_frame) + cv2.imwrite(temp_frame_path, result) + except Exception as exception: + print(exception) + pass + if progress: + progress.update(1) + + +def process_image(source_path: str, target_path: str, output_path: str) -> None: + source_face = get_one_face(cv2.imread(source_path)) + target_frame = cv2.imread(target_path) + result = process_frame(source_face, target_frame) + cv2.imwrite(output_path, result) + + +def process_video(source_path: str, temp_frame_paths: List[str]) -> None: + roop.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames) diff --git a/roop/swapper.py b/roop/swapper.py deleted file mode 100644 index 81a6b1d77..000000000 --- a/roop/swapper.py +++ /dev/null @@ -1,96 +0,0 @@ - -import os -from tqdm import tqdm -import cv2 -import insightface -import threading -import roop.globals -from roop.analyser import get_face_single, get_face_many - -FACE_SWAPPER = None -THREAD_LOCK = threading.Lock() - - -def get_face_swapper(): - global FACE_SWAPPER - with THREAD_LOCK: - if FACE_SWAPPER is None: - model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx') - FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=roop.globals.providers) - return FACE_SWAPPER - - -def swap_face_in_frame(source_face, target_face, frame): - if target_face: - return get_face_swapper().get(frame, target_face, source_face, paste_back=True) - return frame - - -def process_faces(source_face, target_frame): - if roop.globals.all_faces: - many_faces = get_face_many(target_frame) - if many_faces: - for face in many_faces: - target_frame = swap_face_in_frame(source_face, face, target_frame) - else: - face = get_face_single(target_frame) - if face: - target_frame = swap_face_in_frame(source_face, face, target_frame) - return target_frame - - -def process_frames(source_img, frame_paths, progress=None): - source_face = get_face_single(cv2.imread(source_img)) - for frame_path in frame_paths: - frame = cv2.imread(frame_path) - try: - result = process_faces(source_face, frame) - cv2.imwrite(frame_path, result) - except Exception as exception: - print(exception) - pass - if progress: - progress.update(1) - - -def multi_process_frame(source_img, frame_paths, progress): - threads = [] - num_threads = roop.globals.gpu_threads - num_frames_per_thread = len(frame_paths) // num_threads - remaining_frames = len(frame_paths) % num_threads - - # create thread and launch - start_index = 0 - for _ in range(num_threads): - end_index = start_index + num_frames_per_thread - if remaining_frames > 0: - end_index += 1 - remaining_frames -= 1 - thread_frame_paths = frame_paths[start_index:end_index] - thread = threading.Thread(target=process_frames, args=(source_img, thread_frame_paths, progress)) - threads.append(thread) - thread.start() - start_index = end_index - - # threading - for thread in threads: - thread.join() - - -def process_img(source_img, target_path, output_file): - frame = cv2.imread(target_path) - face = get_face_single(frame) - source_face = get_face_single(cv2.imread(source_img)) - result = get_face_swapper().get(frame, face, source_face, paste_back=True) - cv2.imwrite(output_file, result) - print("\n\nImage saved as:", output_file, "\n\n") - - -def process_video(source_img, frame_paths): - do_multi = roop.globals.gpu_vendor is not None and roop.globals.gpu_threads > 1 - progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' - with tqdm(total=len(frame_paths), desc="Processing", unit="frame", dynamic_ncols=True, bar_format=progress_bar_format) as progress: - if do_multi: - multi_process_frame(source_img, frame_paths, progress) - else: - process_frames(source_img, frame_paths, progress) diff --git a/roop/typing.py b/roop/typing.py new file mode 100644 index 000000000..1cff74406 --- /dev/null +++ b/roop/typing.py @@ -0,0 +1,7 @@ +from typing import Any + +from insightface.app.common import Face +import numpy + +Face = Face +Frame = numpy.ndarray[Any, Any] diff --git a/roop/ui.json b/roop/ui.json new file mode 100644 index 000000000..bc0314352 --- /dev/null +++ b/roop/ui.json @@ -0,0 +1,155 @@ +{ + "CTk": { + "fg_color": ["gray95", "gray10"] + }, + "CTkToplevel": { + "fg_color": ["gray95", "gray10"] + }, + "CTkFrame": { + "corner_radius": 6, + "border_width": 0, + "fg_color": ["gray90", "gray13"], + "top_fg_color": ["gray85", "gray16"], + "border_color": ["gray65", "gray28"] + }, + "CTkButton": { + "corner_radius": 6, + "border_width": 0, + "fg_color": ["#3a7ebf", "#1f538d"], + "hover_color": ["#325882", "#14375e"], + "border_color": ["#3E454A", "#949A9F"], + "text_color": ["#DCE4EE", "#DCE4EE"], + "text_color_disabled": ["gray74", "gray60"] + }, + "CTkLabel": { + "corner_radius": 0, + "fg_color": "transparent", + "text_color": ["gray14", "gray84"] + }, + "CTkEntry": { + "corner_radius": 6, + "border_width": 2, + "fg_color": ["#F9F9FA", "#343638"], + "border_color": ["#979DA2", "#565B5E"], + "text_color": ["gray14", "gray84"], + "placeholder_text_color": ["gray52", "gray62"] + }, + "CTkCheckbox": { + "corner_radius": 6, + "border_width": 3, + "fg_color": ["#3a7ebf", "#1f538d"], + "border_color": ["#3E454A", "#949A9F"], + "hover_color": ["#325882", "#14375e"], + "checkmark_color": ["#DCE4EE", "gray90"], + "text_color": ["gray14", "gray84"], + "text_color_disabled": ["gray60", "gray45"] + }, + "CTkSwitch": { + "corner_radius": 1000, + "border_width": 3, + "button_length": 0, + "fg_color": ["#939BA2", "#4A4D50"], + "progress_color": ["#3a7ebf", "#1f538d"], + "button_color": ["gray36", "#D5D9DE"], + "button_hover_color": ["gray20", "gray100"], + "text_color": ["gray14", "gray84"], + "text_color_disabled": ["gray60", "gray45"] + }, + "CTkRadiobutton": { + "corner_radius": 1000, + "border_width_checked": 6, + "border_width_unchecked": 3, + "fg_color": ["#3a7ebf", "#1f538d"], + "border_color": ["#3E454A", "#949A9F"], + "hover_color": ["#325882", "#14375e"], + "text_color": ["gray14", "gray84"], + "text_color_disabled": ["gray60", "gray45"] + }, + "CTkProgressBar": { + "corner_radius": 1000, + "border_width": 0, + "fg_color": ["#939BA2", "#4A4D50"], + "progress_color": ["#3a7ebf", "#1f538d"], + "border_color": ["gray", "gray"] + }, + "CTkSlider": { + "corner_radius": 1000, + "button_corner_radius": 1000, + "border_width": 6, + "button_length": 0, + "fg_color": ["#939BA2", "#4A4D50"], + "progress_color": ["gray40", "#AAB0B5"], + "button_color": ["#3a7ebf", "#1f538d"], + "button_hover_color": ["#325882", "#14375e"] + }, + "CTkOptionMenu": { + "corner_radius": 6, + "fg_color": ["#3a7ebf", "#1f538d"], + "button_color": ["#325882", "#14375e"], + "button_hover_color": ["#234567", "#1e2c40"], + "text_color": ["#DCE4EE", "#DCE4EE"], + "text_color_disabled": ["gray74", "gray60"] + }, + "CTkComboBox": { + "corner_radius": 6, + "border_width": 2, + "fg_color": ["#F9F9FA", "#343638"], + "border_color": ["#979DA2", "#565B5E"], + "button_color": ["#979DA2", "#565B5E"], + "button_hover_color": ["#6E7174", "#7A848D"], + "text_color": ["gray14", "gray84"], + "text_color_disabled": ["gray50", "gray45"] + }, + "CTkScrollbar": { + "corner_radius": 1000, + "border_spacing": 4, + "fg_color": "transparent", + "button_color": ["gray55", "gray41"], + "button_hover_color": ["gray40", "gray53"] + }, + "CTkSegmentedButton": { + "corner_radius": 6, + "border_width": 2, + "fg_color": ["#979DA2", "gray29"], + "selected_color": ["#3a7ebf", "#1f538d"], + "selected_hover_color": ["#325882", "#14375e"], + "unselected_color": ["#979DA2", "gray29"], + "unselected_hover_color": ["gray70", "gray41"], + "text_color": ["#DCE4EE", "#DCE4EE"], + "text_color_disabled": ["gray74", "gray60"] + }, + "CTkTextbox": { + "corner_radius": 6, + "border_width": 0, + "fg_color": ["gray100", "gray20"], + "border_color": ["#979DA2", "#565B5E"], + "text_color": ["gray14", "gray84"], + "scrollbar_button_color": ["gray55", "gray41"], + "scrollbar_button_hover_color": ["gray40", "gray53"] + }, + "CTkScrollableFrame": { + "label_fg_color": ["gray80", "gray21"] + }, + "DropdownMenu": { + "fg_color": ["gray90", "gray20"], + "hover_color": ["gray75", "gray28"], + "text_color": ["gray14", "gray84"] + }, + "CTkFont": { + "macOS": { + "family": "Avenir", + "size": 12, + "weight": "normal" + }, + "Windows": { + "family": "Corbel", + "size": 12, + "weight": "normal" + }, + "Linux": { + "family": "Montserrat", + "size": 12, + "weight": "normal" + } + } +} diff --git a/roop/ui.py b/roop/ui.py index d8891fba4..e36a160a5 100644 --- a/roop/ui.py +++ b/roop/ui.py @@ -1,315 +1,225 @@ -import tkinter as tk -from typing import Any, Callable, Tuple -from PIL import Image, ImageTk -import webbrowser -from tkinter import filedialog -from tkinter.filedialog import asksaveasfilename -import threading - -from roop.utils import is_img - -max_preview_size = 800 - - -def create_preview(parent): - global preview_image_frame, preview_frame_slider, test_button - - preview_window = tk.Toplevel(parent) - # Override close button - preview_window.protocol("WM_DELETE_WINDOW", hide_preview) - preview_window.withdraw() - preview_window.title("Preview") - preview_window.configure(bg="red") - preview_window.resizable(width=False, height=False) - - frame = tk.Frame(preview_window, background="#2d3436") - frame.pack(fill='both', side='left', expand='True') - - # Preview image - preview_image_frame = tk.Label(frame) - preview_image_frame.pack(side='top') - - # Bottom frame - buttons_frame = tk.Frame(frame, background="#2d3436") - buttons_frame.pack(fill='both', side='bottom') - - current_frame = tk.IntVar() - preview_frame_slider = tk.Scale( - buttons_frame, - from_=0, - to=0, - orient='horizontal', - variable=current_frame - ) - preview_frame_slider.pack(fill='both', side='left', expand='True') - - test_button = tk.Button(buttons_frame, text="Test", bg="#f1c40f", relief="flat", width=15, borderwidth=0, highlightthickness=0) - test_button.pack(side='right', fill='y') - return preview_window - - -def show_preview(): - preview.deiconify() - preview_visible.set(True) - - -def hide_preview(): - preview.withdraw() - preview_visible.set(False) +import os +import customtkinter as ctk +from typing import Callable, Tuple +import cv2 +from PIL import Image, ImageOps -def set_preview_handler(test_handler): - test_button.config(command = test_handler) +import roop.globals +import roop.metadata +from roop.face_analyser import get_one_face +from roop.capturer import get_video_frame, get_video_frame_total +from roop.predicter import predict_frame +from roop.processors.frame.core import get_frame_processors_modules +from roop.utilities import is_image, is_video, resolve_relative_path +ROOT = None +ROOT_HEIGHT = 700 +ROOT_WIDTH = 600 -def init_slider(frames_count, change_handler): - preview_frame_slider.configure(to=frames_count, command=lambda value: change_handler(preview_frame_slider.get())) - preview_frame_slider.set(0) +PREVIEW = None +PREVIEW_MAX_HEIGHT = 700 +PREVIEW_MAX_WIDTH = 1200 +RECENT_DIRECTORY_SOURCE = None +RECENT_DIRECTORY_TARGET = None +RECENT_DIRECTORY_OUTPUT = None -def update_preview(frame): - img = Image.fromarray(frame) - width, height = img.size - aspect_ratio = 1 - if width > height: - aspect_ratio = max_preview_size / width - else: - aspect_ratio = max_preview_size / height - img = img.resize( - ( - int(width * aspect_ratio), - int(height * aspect_ratio) - ), - Image.ANTIALIAS - ) - photo_img = ImageTk.PhotoImage(img) - preview_image_frame.configure(image=photo_img) - preview_image_frame.image = photo_img +preview_label = None +preview_slider = None +source_label = None +target_label = None +status_label = None + + +def init(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk: + global ROOT, PREVIEW + ROOT = create_root(start, destroy) + PREVIEW = create_preview(ROOT) -def select_face(select_face_handler: Callable[[str], None]): - if select_face_handler: - path = filedialog.askopenfilename(title="Select a face") - preview_face(path) - return select_face_handler(path) - return None - + return ROOT -def update_slider_handler(get_video_frame, video_path): - return lambda frame_number: update_preview(get_video_frame(video_path, frame_number)) +def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk: + global source_label, target_label, status_label -def test_preview(create_test_preview): - frame = create_test_preview(preview_frame_slider.get()) - update_preview(frame) + ctk.deactivate_automatic_dpi_awareness() + ctk.set_appearance_mode('system') + ctk.set_default_color_theme(resolve_relative_path('ui.json')) + root = ctk.CTk() + root.minsize(ROOT_WIDTH, ROOT_HEIGHT) + root.title(f'{roop.metadata.name} {roop.metadata.version}') + root.configure() + root.protocol('WM_DELETE_WINDOW', lambda: destroy()) + source_label = ctk.CTkLabel(root, text=None) + source_label.place(relx=0.1, rely=0.1, relwidth=0.3, relheight=0.25) -def update_slider(get_video_frame, create_test_preview, video_path, frames_amount): - init_slider(frames_amount, update_slider_handler(get_video_frame, video_path)) - set_preview_handler(lambda: preview_thread(lambda: test_preview(create_test_preview))) + target_label = ctk.CTkLabel(root, text=None) + target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25) + source_button = ctk.CTkButton(root, text='Select a face', command=lambda: select_source_path()) + source_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1) -def analyze_target(select_target_handler: Callable[[str], Tuple[int, Any]], target_path: tk.StringVar, frames_amount: tk.IntVar): - path = filedialog.askopenfilename(title="Select a target") - target_path.set(path) - amount, frame = select_target_handler(path) - frames_amount.set(amount) - preview_target(frame) - update_preview(frame) + target_button = ctk.CTkButton(root, text='Select a target', command=lambda: select_target_path()) + target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1) + keep_fps_value = ctk.BooleanVar(value=roop.globals.keep_fps) + keep_fps_checkbox = ctk.CTkSwitch(root, text='Keep fps', variable=keep_fps_value, command=lambda: setattr(roop.globals, 'keep_fps', not roop.globals.keep_fps)) + keep_fps_checkbox.place(relx=0.1, rely=0.6) -def select_target(select_target_handler: Callable[[str], Tuple[int, Any]], target_path: tk.StringVar, frames_amount: tk.IntVar): - if select_target_handler: - analyze_target(select_target_handler, target_path, frames_amount) + keep_frames_value = ctk.BooleanVar(value=roop.globals.keep_frames) + keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, command=lambda: setattr(roop.globals, 'keep_frames', keep_frames_value.get())) + keep_frames_switch.place(relx=0.1, rely=0.65) + keep_audio_value = ctk.BooleanVar(value=roop.globals.keep_audio) + keep_audio_switch = ctk.CTkSwitch(root, text='Keep audio', variable=keep_audio_value, command=lambda: setattr(roop.globals, 'keep_audio', keep_audio_value.get())) + keep_audio_switch.place(relx=0.6, rely=0.6) -def save_file(save_file_handler: Callable[[str], None], target_path: str): - filename, ext = 'output.mp4', '.mp4' + many_faces_value = ctk.BooleanVar(value=roop.globals.many_faces) + many_faces_switch = ctk.CTkSwitch(root, text='Many faces', variable=many_faces_value, command=lambda: setattr(roop.globals, 'many_faces', many_faces_value.get())) + many_faces_switch.place(relx=0.6, rely=0.65) - if is_img(target_path): - filename, ext = 'output.png', '.png' + start_button = ctk.CTkButton(root, text='Start', command=lambda: select_output_path(start)) + start_button.place(relx=0.15, rely=0.75, relwidth=0.2, relheight=0.05) - if save_file_handler: - return save_file_handler(asksaveasfilename(initialfile=filename, defaultextension=ext, filetypes=[("All Files","*.*"),("Videos","*.mp4")])) - return None + stop_button = ctk.CTkButton(root, text='Destroy', command=lambda: destroy()) + stop_button.place(relx=0.4, rely=0.75, relwidth=0.2, relheight=0.05) + preview_button = ctk.CTkButton(root, text='Preview', command=lambda: toggle_preview()) + preview_button.place(relx=0.65, rely=0.75, relwidth=0.2, relheight=0.05) -def toggle_all_faces(toggle_all_faces_handler: Callable[[int], None], variable: tk.IntVar): - if toggle_all_faces_handler: - return lambda: toggle_all_faces_handler(variable.get()) - return None + status_label = ctk.CTkLabel(root, text=None, justify='center') + status_label.place(relx=0.1, rely=0.9, relwidth=0.8) + return root -def toggle_fps_limit(toggle_all_faces_handler: Callable[[int], None], variable: tk.IntVar): - if toggle_all_faces_handler: - return lambda: toggle_all_faces_handler(variable.get()) - return None +def create_preview(parent: ctk.CTkToplevel) -> ctk.CTkToplevel: + global preview_label, preview_slider -def toggle_keep_frames(toggle_keep_frames_handler: Callable[[int], None], variable: tk.IntVar): - if toggle_keep_frames_handler: - return lambda: toggle_keep_frames_handler(variable.get()) - return None + preview = ctk.CTkToplevel(parent) + preview.withdraw() + preview.title('Preview') + preview.configure() + preview.protocol('WM_DELETE_WINDOW', lambda: toggle_preview()) + preview.resizable(width=False, height=False) + + preview_label = ctk.CTkLabel(preview, text=None) + preview_label.pack(fill='both', expand=True) + preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=lambda frame_value: update_preview(frame_value)) -def create_button(parent, text, command): - return tk.Button( - parent, - text=text, - command=command, - bg="#f1c40f", - relief="flat", - borderwidth=0, - highlightthickness=0 - ) + return preview -def create_background_button(parent, text, command): - button = create_button(parent, text, command) - button.configure( - bg="#2d3436", - fg="#74b9ff", - highlightthickness=4, - highlightbackground="#74b9ff", - activebackground="#74b9ff", - borderwidth=4 - ) - return button +def update_status(text: str) -> None: + status_label.configure(text=text) + ROOT.update() -def create_check(parent, text, variable, command): - return tk.Checkbutton( - parent, - anchor="w", - relief="groove", - activebackground="#2d3436", - activeforeground="#74b9ff", - selectcolor="black", - text=text, - fg="#dfe6e9", - borderwidth=0, - highlightthickness=0, - bg="#2d3436", - variable=variable, - command=command - ) +def select_source_path() -> None: + global RECENT_DIRECTORY_SOURCE + PREVIEW.withdraw() + source_path = ctk.filedialog.askopenfilename(title='select an source image', initialdir=RECENT_DIRECTORY_SOURCE) + if is_image(source_path): + roop.globals.source_path = source_path + RECENT_DIRECTORY_SOURCE = os.path.dirname(roop.globals.source_path) + image = render_image_preview(roop.globals.source_path, (200, 200)) + source_label.configure(image=image) + else: + roop.globals.source_path = None + source_label.configure(image=None) + + +def select_target_path() -> None: + global RECENT_DIRECTORY_TARGET + + PREVIEW.withdraw() + target_path = ctk.filedialog.askopenfilename(title='select an target image or video', initialdir=RECENT_DIRECTORY_TARGET) + if is_image(target_path): + roop.globals.target_path = target_path + RECENT_DIRECTORY_TARGET = os.path.dirname(roop.globals.target_path) + image = render_image_preview(roop.globals.target_path, (200, 200)) + target_label.configure(image=image) + elif is_video(target_path): + roop.globals.target_path = target_path + RECENT_DIRECTORY_TARGET = os.path.dirname(roop.globals.target_path) + video_frame = render_video_preview(target_path, (200, 200)) + target_label.configure(image=video_frame) + else: + roop.globals.target_path = None + target_label.configure(image=None) -def preview_thread(thread_function): - threading.Thread(target=thread_function).start() +def select_output_path(start: Callable[[], None]) -> None: + global RECENT_DIRECTORY_OUTPUT -def open_preview_window(get_video_frame, target_path): - if preview_visible.get(): - hide_preview() + if is_image(roop.globals.target_path): + output_path = ctk.filedialog.asksaveasfilename(title='save image output file', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT) + elif is_video(roop.globals.target_path): + output_path = ctk.filedialog.asksaveasfilename(title='save video output file', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT) else: - show_preview() - if target_path: - frame = get_video_frame(target_path) - update_preview(frame) - - -def preview_face(path): - img = Image.open(path) - img = img.resize((180, 180), Image.ANTIALIAS) - photo_img = ImageTk.PhotoImage(img) - face_label.configure(image=photo_img) - face_label.image = photo_img - - -def preview_target(frame): - img = Image.fromarray(frame) - img = img.resize((180, 180), Image.ANTIALIAS) - photo_img = ImageTk.PhotoImage(img) - target_label.configure(image=photo_img) - target_label.image = photo_img - - -def update_status_label(value): - status_label["text"] = value - window.update() - - -def init( - initial_values: dict, - select_face_handler: Callable[[str], None], - select_target_handler: Callable[[str], Tuple[int, Any]], - toggle_all_faces_handler: Callable[[int], None], - toggle_fps_limit_handler: Callable[[int], None], - toggle_keep_frames_handler: Callable[[int], None], - save_file_handler: Callable[[str], None], - start: Callable[[], None], - get_video_frame: Callable[[str, int], None], - create_test_preview: Callable[[int], Any], -): - global window, preview, preview_visible, face_label, target_label, status_label - - window = tk.Tk() - window.geometry("600x700") - window.title("roop") - window.configure(bg="#2d3436") - window.resizable(width=False, height=False) - - preview_visible = tk.BooleanVar(window, False) - target_path = tk.StringVar() - frames_amount = tk.IntVar() - - # Preview window - preview = create_preview(window) - - # Contact information - support_link = tk.Label(window, text="Donate to project <3", fg="#fd79a8", bg="#2d3436", cursor="hand2", font=("Arial", 8)) - support_link.place(x=180,y=20,width=250,height=30) - support_link.bind("", lambda e: webbrowser.open("https://github.com/sponsors/s0md3v")) - - left_frame = tk.Frame(window) - left_frame.place(x=60, y=100, width=180, height=180) - face_label = tk.Label(left_frame) - face_label.pack(fill='both', side='top', expand=True) - - right_frame = tk.Frame(window) - right_frame.place(x=360, y=100, width=180, height=180) - target_label = tk.Label(right_frame) - target_label.pack(fill='both', side='top', expand=True) - - # Select a face button - face_button = create_background_button(window, "Select a face", lambda: [ - select_face(select_face_handler) - ]) - face_button.place(x=60,y=320,width=180,height=80) - - # Select a target button - target_button = create_background_button(window, "Select a target", lambda: [ - select_target(select_target_handler, target_path, frames_amount), - update_slider(get_video_frame, create_test_preview, target_path.get(), frames_amount.get()) - ]) - target_button.place(x=360,y=320,width=180,height=80) - - # All faces checkbox - all_faces = tk.IntVar(None, initial_values['all_faces']) - all_faces_checkbox = create_check(window, "Process all faces in frame", all_faces, toggle_all_faces(toggle_all_faces_handler, all_faces)) - all_faces_checkbox.place(x=60,y=500,width=240,height=31) - - # FPS limit checkbox - limit_fps = tk.IntVar(None, not initial_values['keep_fps']) - fps_checkbox = create_check(window, "Limit FPS to 30", limit_fps, toggle_fps_limit(toggle_fps_limit_handler, limit_fps)) - fps_checkbox.place(x=60,y=475,width=240,height=31) - - # Keep frames checkbox - keep_frames = tk.IntVar(None, initial_values['keep_frames']) - frames_checkbox = create_check(window, "Keep frames dir", keep_frames, toggle_keep_frames(toggle_keep_frames_handler, keep_frames)) - frames_checkbox.place(x=60,y=450,width=240,height=31) - - # Start button - start_button = create_button(window, "Start", lambda: [save_file(save_file_handler, target_path.get()), preview_thread(lambda: start(update_preview))]) - start_button.place(x=170,y=560,width=120,height=49) - - # Preview button - preview_button = create_button(window, "Preview", lambda: open_preview_window(get_video_frame, target_path.get())) - preview_button.place(x=310,y=560,width=120,height=49) - - # Status label - status_label = tk.Label(window, width=580, justify="center", text="Status: waiting for input...", fg="#2ecc71", bg="#2d3436") - status_label.place(x=10,y=640,width=580,height=30) - - return window \ No newline at end of file + output_path = None + if output_path: + roop.globals.output_path = output_path + RECENT_DIRECTORY_OUTPUT = os.path.dirname(roop.globals.output_path) + start() + + +def render_image_preview(image_path: str, size: Tuple[int, int]) -> ctk.CTkImage: + image = Image.open(image_path) + if size: + image = ImageOps.fit(image, size, Image.LANCZOS) + return ctk.CTkImage(image, size=image.size) + + +def render_video_preview(video_path: str, size: Tuple[int, int], frame_number: int = 0) -> ctk.CTkImage: + capture = cv2.VideoCapture(video_path) + if frame_number: + capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number) + has_frame, frame = capture.read() + if has_frame: + image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) + if size: + image = ImageOps.fit(image, size, Image.LANCZOS) + return ctk.CTkImage(image, size=image.size) + capture.release() + cv2.destroyAllWindows() + + +def toggle_preview() -> None: + if PREVIEW.state() == 'normal': + PREVIEW.withdraw() + elif roop.globals.source_path and roop.globals.target_path: + init_preview() + update_preview() + PREVIEW.deiconify() + + +def init_preview() -> None: + if is_image(roop.globals.target_path): + preview_slider.pack_forget() + if is_video(roop.globals.target_path): + video_frame_total = get_video_frame_total(roop.globals.target_path) + preview_slider.configure(to=video_frame_total) + preview_slider.pack(fill='x') + preview_slider.set(0) + + +def update_preview(frame_number: int = 0) -> None: + if roop.globals.source_path and roop.globals.target_path: + temp_frame = get_video_frame(roop.globals.target_path, frame_number) + if predict_frame(temp_frame): + quit() + for frame_processor in get_frame_processors_modules(roop.globals.frame_processors): + temp_frame = frame_processor.process_frame( + get_one_face(cv2.imread(roop.globals.source_path)), + temp_frame + ) + image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)) + image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS) + image = ctk.CTkImage(image, size=image.size) + preview_label.configure(image=image) diff --git a/roop/utilities.py b/roop/utilities.py new file mode 100644 index 000000000..433cb43b1 --- /dev/null +++ b/roop/utilities.py @@ -0,0 +1,141 @@ +import glob +import mimetypes +import os +import platform +import shutil +import ssl +import subprocess +import urllib +from pathlib import Path +from typing import List, Any +from tqdm import tqdm + +import roop.globals + +TEMP_FILE = 'temp.mp4' +TEMP_DIRECTORY = 'temp' + +# monkey patch ssl for mac +if platform.system().lower() == 'darwin': + ssl._create_default_https_context = ssl._create_unverified_context + + +def run_ffmpeg(args: List[str]) -> bool: + commands = ['ffmpeg', '-hide_banner', '-hwaccel', 'auto', '-loglevel', roop.globals.log_level] + commands.extend(args) + try: + subprocess.check_output(commands, stderr=subprocess.STDOUT) + return True + except Exception: + pass + return False + + +def detect_fps(target_path: str) -> float: + command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', target_path] + output = subprocess.check_output(command).decode().strip().split('/') + try: + numerator, denominator = map(int, output) + return numerator / denominator + except Exception: + pass + return 30.0 + + +def extract_frames(target_path: str) -> None: + temp_directory_path = get_temp_directory_path(target_path) + run_ffmpeg(['-i', target_path, '-pix_fmt', 'rgb24', os.path.join(temp_directory_path, '%04d.png')]) + + +def create_video(target_path: str, fps: float = 30.0) -> None: + temp_output_path = get_temp_output_path(target_path) + temp_directory_path = get_temp_directory_path(target_path) + run_ffmpeg(['-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'), '-c:v', roop.globals.video_encoder, '-crf', str(roop.globals.video_quality), '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', '-y', temp_output_path]) + + +def restore_audio(target_path: str, output_path: str) -> None: + temp_output_path = get_temp_output_path(target_path) + done = run_ffmpeg(['-i', temp_output_path, '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path]) + if not done: + move_temp(target_path, output_path) + + +def get_temp_frame_paths(target_path: str) -> List[str]: + temp_directory_path = get_temp_directory_path(target_path) + return glob.glob((os.path.join(glob.escape(temp_directory_path), '*.png'))) + + +def get_temp_directory_path(target_path: str) -> str: + target_name, _ = os.path.splitext(os.path.basename(target_path)) + target_directory_path = os.path.dirname(target_path) + return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name) + + +def get_temp_output_path(target_path: str) -> str: + temp_directory_path = get_temp_directory_path(target_path) + return os.path.join(temp_directory_path, TEMP_FILE) + + +def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any: + if source_path and target_path: + source_name, _ = os.path.splitext(os.path.basename(source_path)) + target_name, target_extension = os.path.splitext(os.path.basename(target_path)) + if os.path.isdir(output_path): + return os.path.join(output_path, source_name + '-' + target_name + target_extension) + return output_path + + +def create_temp(target_path: str) -> None: + temp_directory_path = get_temp_directory_path(target_path) + Path(temp_directory_path).mkdir(parents=True, exist_ok=True) + + +def move_temp(target_path: str, output_path: str) -> None: + temp_output_path = get_temp_output_path(target_path) + if os.path.isfile(temp_output_path): + if os.path.isfile(output_path): + os.remove(output_path) + shutil.move(temp_output_path, output_path) + + +def clean_temp(target_path: str) -> None: + temp_directory_path = get_temp_directory_path(target_path) + parent_directory_path = os.path.dirname(temp_directory_path) + if not roop.globals.keep_frames and os.path.isdir(temp_directory_path): + shutil.rmtree(temp_directory_path) + if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path): + os.rmdir(parent_directory_path) + + +def has_image_extension(image_path: str) -> bool: + return image_path.lower().endswith(('png', 'jpg', 'jpeg')) + + +def is_image(image_path: str) -> bool: + if image_path and os.path.isfile(image_path): + mimetype, _ = mimetypes.guess_type(image_path) + return bool(mimetype and mimetype.startswith('image/')) + return False + + +def is_video(video_path: str) -> bool: + if video_path and os.path.isfile(video_path): + mimetype, _ = mimetypes.guess_type(video_path) + return bool(mimetype and mimetype.startswith('video/')) + return False + + +def conditional_download(download_directory_path: str, urls: List[str]) -> None: + if not os.path.exists(download_directory_path): + os.makedirs(download_directory_path) + for url in urls: + download_file_path = os.path.join(download_directory_path, os.path.basename(url)) + if not os.path.exists(download_file_path): + request = urllib.request.urlopen(url) # type: ignore[attr-defined] + total = int(request.headers.get('Content-Length', 0)) + with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress: + urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined] + + +def resolve_relative_path(path: str) -> str: + return os.path.abspath(os.path.join(os.path.dirname(__file__), path)) diff --git a/roop/utils.py b/roop/utils.py deleted file mode 100644 index 3ec687294..000000000 --- a/roop/utils.py +++ /dev/null @@ -1,72 +0,0 @@ -import os -import shutil -import roop.globals - -sep = "/" -if os.name == "nt": - sep = "\\" - - -def path(string): - if sep == "\\": - return string.replace("/", "\\") - return string - - -def run_command(command, mode="silent"): - if mode == "debug": - return os.system(command) - return os.popen(command).read() - - -def detect_fps(input_path): - input_path = path(input_path) - output = os.popen(f'ffprobe -v error -select_streams v -of default=noprint_wrappers=1:nokey=1 -show_entries stream=r_frame_rate "{input_path}"').read() - if "/" in output: - try: - return int(output.split("/")[0]) // int(output.split("/")[1].strip()), output.strip() - except: - pass - return 30, 30 - - -def run_ffmpeg(args): - log_level = f'-loglevel {roop.globals.log_level}' - run_command(f'ffmpeg {log_level} {args}') - - -def set_fps(input_path, output_path, fps): - input_path, output_path = path(input_path), path(output_path) - run_ffmpeg(f'-i "{input_path}" -filter:v fps=fps={fps} "{output_path}"') - - -def create_video(video_name, fps, output_dir): - hwaccel_option = '-hwaccel cuda' if roop.globals.gpu_vendor == 'nvidia' else '' - output_dir = path(output_dir) - run_ffmpeg(f'{hwaccel_option} -framerate "{fps}" -i "{output_dir}{sep}%04d.png" -c:v libx264 -crf 7 -pix_fmt yuv420p -y "{output_dir}{sep}output.mp4"') - - -def extract_frames(input_path, output_dir): - hwaccel_option = '-hwaccel cuda' if roop.globals.gpu_vendor == 'nvidia' else '' - input_path, output_dir = path(input_path), path(output_dir) - run_ffmpeg(f' {hwaccel_option} -i "{input_path}" "{output_dir}{sep}%04d.png"') - - -def add_audio(output_dir, target_path, video, keep_frames, output_file): - video_name = os.path.splitext(video)[0] - save_to = output_file if output_file else output_dir + "/swapped-" + video_name + ".mp4" - save_to_ff, output_dir_ff = path(save_to), path(output_dir) - run_ffmpeg(f'-i "{output_dir_ff}{sep}output.mp4" -i "{output_dir_ff}{sep}{video}" -c:v copy -map 0:v:0 -map 1:a:0 -y "{save_to_ff}"') - if not os.path.isfile(save_to): - shutil.move(output_dir + "/output.mp4", save_to) - if not keep_frames: - shutil.rmtree(output_dir) - - -def is_img(path): - return path.lower().endswith(("png", "jpg", "jpeg", "bmp")) - - -def rreplace(s, old, new, occurrence): - li = s.rsplit(old, occurrence) - return new.join(li)