From 6a100b1526f2438f417a0b7180e8265390ed8814 Mon Sep 17 00:00:00 2001 From: k4yt3x Date: Wed, 26 Feb 2020 05:27:57 -0500 Subject: [PATCH] removed multi-threading in favor of muti-processing --- src/anime4k.py | 93 +++++++++------- src/upscaler.py | 220 ++++++++++++++++++------------------- src/waifu2x_caffe.py | 73 ++++++------ src/waifu2x_converter.py | 80 +++++++------- src/waifu2x_ncnn_vulkan.py | 62 +++++------ 5 files changed, 254 insertions(+), 274 deletions(-) diff --git a/src/anime4k.py b/src/anime4k.py index 61bc262..49a443d 100755 --- a/src/anime4k.py +++ b/src/anime4k.py @@ -4,13 +4,16 @@ Name: Anime4K Driver Author: K4YT3X Date Created: August 15, 2019 -Last Modified: November 15, 2019 +Last Modified: February 26, 2020 Description: This class is a high-level wrapper for Anime4k. """ # built-in imports +import os +import queue +import shlex import subprocess import threading @@ -31,7 +34,7 @@ class Anime4k: self.driver_settings = driver_settings self.print_lock = threading.Lock() - def upscale(self, input_directory, output_directory, scale_ratio, upscaler_exceptions, push_strength=None, push_grad_strength=None): + def upscale(self, input_directory, output_directory, scale_ratio, processes, push_strength=None, push_grad_strength=None): """ Anime4K wrapper Arguments: @@ -46,47 +49,63 @@ class Anime4k: Returns: subprocess.Popen.returncode -- command line return value of execution """ - try: - # return value is the sum of all execution return codes - return_value = 0 - # get a list lof all image files in input_directory - extracted_frame_files = [f for f in input_directory.iterdir() if str(f).lower().endswith('.png') or str(f).lower().endswith('.jpg')] + # a list of all commands to be executed + commands = queue.Queue() - # upscale each image in input_directory - for image in extracted_frame_files: + # get a list lof all image files in input_directory + extracted_frame_files = [f for f in input_directory.iterdir() if str(f).lower().endswith('.png') or str(f).lower().endswith('.jpg')] - execute = [ - self.driver_settings['java_path'], - '-jar', - self.driver_settings['path'], - str(image.absolute()), - str(output_directory / image.name), - str(scale_ratio) - ] + # upscale each image in input_directory + for image in extracted_frame_files: - # optional arguments - kwargs = [ - 'push_strength', - 'push_grad_strength' - ] + execute = [ + self.driver_settings['java_path'], + '-jar', + self.driver_settings['path'], + str(image.absolute()), + str(output_directory / image.name), + str(scale_ratio) + ] - # if optional argument specified, append value to execution list - for arg in kwargs: - if locals()[arg] is not None: - execute.extend([locals([arg])]) + # optional arguments + kwargs = [ + 'push_strength', + 'push_grad_strength' + ] + + # if optional argument specified, append value to execution list + for arg in kwargs: + if locals()[arg] is not None: + execute.extend([locals([arg])]) + + commands.put(execute) + + # initialize two lists to hold running and finished processes + anime4k_running_processes = [] + anime4k_finished_processes = [] + + # run all commands in queue + while not commands.empty(): + + # if any commands have completed + # remove the subprocess.Popen project and move it into finished processes + for process in anime4k_running_processes: + if process.poll() is not None: + Avalon.debug_info(f'Subprocess {process.pid} exited with code {process.poll()}') + anime4k_finished_processes.append(process) + anime4k_running_processes.remove(process) + + # when number running processes is less than what's specified + # create new processes and add to running process pool + while len(anime4k_running_processes) < processes: + next_in_queue = commands.get() + new_process = subprocess.Popen(next_in_queue) + anime4k_running_processes.append(new_process) self.print_lock.acquire() - Avalon.debug_info(f'Executing: {execute}', ) + Avalon.debug_info(f'[upscaler] Subprocess {new_process.pid} executing: {shlex.join(next_in_queue)}') self.print_lock.release() - return_value += subprocess.run(execute, check=True).returncode - # print thread exiting message - self.print_lock.acquire() - Avalon.debug_info(f'[upscaler] Thread {threading.current_thread().name} exiting') - self.print_lock.release() - - # return command execution return code - return return_value - except Exception as e: - upscaler_exceptions.append(e) + # return command execution return code + return anime4k_finished_processes diff --git a/src/upscaler.py b/src/upscaler.py index ec7e359..7871777 100755 --- a/src/upscaler.py +++ b/src/upscaler.py @@ -30,6 +30,7 @@ import copy import pathlib import re import shutil +import sys import tempfile import threading import time @@ -65,7 +66,7 @@ class Upscaler: self.scale_height = None self.scale_ratio = None self.model_dir = None - self.threads = 1 + self.processes = 1 self.video2x_cache_directory = pathlib.Path(tempfile.gettempdir()) / 'video2x' self.image_format = 'png' self.preserve_frames = False @@ -150,150 +151,136 @@ class Upscaler: w2 {Waifu2x Object} -- initialized waifu2x object """ - # progress bar thread exit signal + # progress bar process exit signal self.progress_bar_exit_signal = False - # create a container for exceptions in threads - # if this thread is not empty, then an exception has occured - self.upscaler_exceptions = [] - # initialize waifu2x driver - drivers = AVAILABLE_DRIVERS - if self.waifu2x_driver not in drivers: - raise UnrecognizedDriverError(f'Unrecognized waifu2x driver: {self.waifu2x_driver}') + if self.waifu2x_driver not in AVAILABLE_DRIVERS: + raise UnrecognizedDriverError(f'Unrecognized driver: {self.waifu2x_driver}') - # it's easier to do multi-threading with waifu2x_converter - # the number of threads can be passed directly to waifu2x_converter - if self.waifu2x_driver == 'waifu2x_converter': - w2 = Waifu2xConverter(self.driver_settings, self.model_dir) + # create a container for all upscaler processes + upscaler_processes = [] - progress_bar = threading.Thread(target=self._progress_bar, args=([self.extracted_frames],)) - progress_bar.start() + # list all images in the extracted frames + frames = [(self.extracted_frames / f) for f in self.extracted_frames.iterdir() if f.is_file] - w2.upscale(self.extracted_frames, self.upscaled_frames, self.scale_ratio, self.threads, self.image_format, self.upscaler_exceptions) - for image in [f for f in self.upscaled_frames.iterdir() if f.is_file()]: - renamed = re.sub(f'_\[.*-.*\]\[x(\d+(\.\d+)?)\]\.{self.image_format}', f'.{self.image_format}', str(image)) - (self.upscaled_frames / image).rename(self.upscaled_frames / renamed) + # if we have less images than processes, + # create only the processes necessary + if len(frames) < self.processes: + self.processes = len(frames) - self.progress_bar_exit_signal = True - progress_bar.join() - return + # create a directory for each process and append directory + # name into a list + process_directories = [] + for process_id in range(self.processes): + process_directory = self.extracted_frames / str(process_id) + process_directories.append(process_directory) + + # delete old directories and create new directories + if process_directory.is_dir(): + shutil.rmtree(process_directory) + process_directory.mkdir(parents=True, exist_ok=True) + + # waifu2x-converter-cpp will perform multi-threading within its own process + if self.waifu2x_driver in ['waifu2x_converter', 'anime4k'] : + process_directories = [self.extracted_frames] - # drivers that are to be multi-threaded by video2x else: - # create a container for all upscaler threads - upscaler_threads = [] - - # list all images in the extracted frames - frames = [(self.extracted_frames / f) for f in self.extracted_frames.iterdir() if f.is_file] - - # if we have less images than threads, - # create only the threads necessary - if len(frames) < self.threads: - self.threads = len(frames) - - # create a directory for each thread and append directory - # name into a list - - thread_pool = [] - thread_directories = [] - for thread_id in range(self.threads): - thread_directory = self.extracted_frames / str(thread_id) - thread_directories.append(thread_directory) - - # delete old directories and create new directories - if thread_directory.is_dir(): - shutil.rmtree(thread_directory) - thread_directory.mkdir(parents=True, exist_ok=True) - - # append directory path into list - thread_pool.append((thread_directory, thread_id)) - # evenly distribute images into each directory # until there is none left in the directory for image in frames: # move image - image.rename(thread_pool[0][0] / image.name) + image.rename(process_directories[0] / image.name) # rotate list - thread_pool = thread_pool[-1:] + thread_pool[:-1] + process_directories = process_directories[-1:] + process_directories[:-1] - # create threads and start them - for thread_info in thread_pool: + # create threads and start them + for process_directory in process_directories: - # create a separate w2 instance for each thread - if self.waifu2x_driver == 'waifu2x_caffe': - w2 = Waifu2xCaffe(copy.deepcopy(self.driver_settings), self.method, self.model_dir, self.bit_depth) - if self.scale_ratio: - thread = threading.Thread(target=w2.upscale, - args=(thread_info[0], - self.upscaled_frames, - self.scale_ratio, - False, - False, - self.image_format, - self.upscaler_exceptions)) - else: - thread = threading.Thread(target=w2.upscale, - args=(thread_info[0], - self.upscaled_frames, - False, - self.scale_width, - self.scale_height, - self.image_format, - self.upscaler_exceptions)) + # if the driver being used is waifu2x-caffe + if self.waifu2x_driver == 'waifu2x_caffe': + driver = Waifu2xCaffe(copy.deepcopy(self.driver_settings), self.method, self.model_dir, self.bit_depth) + if self.scale_ratio: + upscaler_processes.append(driver.upscale(process_directory, + self.upscaled_frames, + self.scale_ratio, + False, + False, + self.image_format)) + else: + upscaler_processes.append(driver.upscale(process_directory, + self.upscaled_frames, + False, + self.scale_width, + self.scale_height, + self.image_format)) - # if the driver being used is waifu2x_ncnn_vulkan - elif self.waifu2x_driver == 'waifu2x_ncnn_vulkan': - w2 = Waifu2xNcnnVulkan(copy.deepcopy(self.driver_settings)) - thread = threading.Thread(target=w2.upscale, - args=(thread_info[0], - self.upscaled_frames, - self.scale_ratio, - self.upscaler_exceptions)) + # if the driver being used is waifu2x-converter-cpp + elif self.waifu2x_driver == 'waifu2x_converter': + driver = Waifu2xConverter(self.driver_settings, self.model_dir) + upscaler_processes.append(driver.upscale(process_directory, + self.upscaled_frames, + self.scale_ratio, + self.processes, + self.image_format)) - # if the driver being used is anime4k - elif self.waifu2x_driver == 'anime4k': - w2 = Anime4k(copy.deepcopy(self.driver_settings)) - thread = threading.Thread(target=w2.upscale, - args=(thread_info[0], - self.upscaled_frames, - self.scale_ratio, - self.upscaler_exceptions)) + # if the driver being used is waifu2x-ncnn-vulkan + elif self.waifu2x_driver == 'waifu2x_ncnn_vulkan': + driver = Waifu2xNcnnVulkan(copy.deepcopy(self.driver_settings)) + upscaler_processes.append(driver.upscale(process_directory, + self.upscaled_frames, + self.scale_ratio)) - # create thread - thread.name = thread_info[1] + # if the driver being used is anime4k + elif self.waifu2x_driver == 'anime4k': + driver = Anime4k(copy.deepcopy(self.driver_settings)) + upscaler_processes += driver.upscale(process_directory, + self.upscaled_frames, + self.scale_ratio, + self.processes) - # add threads into the pool - upscaler_threads.append(thread) + # start progress bar in a different thread + progress_bar = threading.Thread(target=self._progress_bar, args=(process_directories,)) + progress_bar.start() - # start progress bar in a different thread - progress_bar = threading.Thread(target=self._progress_bar, args=(thread_directories,)) - progress_bar.start() + # create the clearer and start it + Avalon.debug_info('Starting upscaled image cleaner') + image_cleaner = ImageCleaner(self.extracted_frames, self.upscaled_frames, len(upscaler_processes)) + image_cleaner.start() - # create the clearer and start it - Avalon.debug_info('Starting upscaled image cleaner') - image_cleaner = ImageCleaner(self.extracted_frames, self.upscaled_frames, len(upscaler_threads)) - image_cleaner.start() + # wait for all process to exit + try: + Avalon.debug_info('Main process waiting for subprocesses to exit') + for process in upscaler_processes: + Avalon.debug_info(f'Subprocess {process.pid} exited with code {process.wait()}') + except (KeyboardInterrupt, SystemExit): + Avalon.warning('Exit signal received') + Avalon.warning('Killing processes') + for process in upscaler_processes: + process.terminate() - # start all threads - for thread in upscaler_threads: - thread.start() - - # wait for threads to finish - for thread in upscaler_threads: - thread.join() - - # upscaling done, kill the clearer + # cleanup and exit with exit code 1 Avalon.debug_info('Killing upscaled image cleaner') image_cleaner.stop() - self.progress_bar_exit_signal = True + sys.exit(1) - if len(self.upscaler_exceptions) != 0: - raise(self.upscaler_exceptions[0]) + # if the driver is waifu2x-converter-cpp + # images need to be renamed to be recognizable for FFmpeg + if self.waifu2x_driver == 'waifu2x_converter': + for image in [f for f in self.upscaled_frames.iterdir() if f.is_file()]: + renamed = re.sub(f'_\\[.*\\]\\[x(\\d+(\\.\\d+)?)\\]\\.{self.image_format}', f'.{self.image_format}', str(image.name)) + (self.upscaled_frames / image).rename(self.upscaled_frames / renamed) + + # upscaling done, kill the clearer + Avalon.debug_info('Killing upscaled image cleaner') + image_cleaner.stop() + + # pass exit signal to progress bar thread + self.progress_bar_exit_signal = True def run(self): - """Main controller for Video2X + """ Main controller for Video2X This function controls the flow of video conversion and handles all necessary functions. @@ -337,6 +324,7 @@ class Upscaler: # get a dict of all pixel formats and corresponding bit depth pixel_formats = fm.get_pixel_formats() + # try getting pixel format's corresponding bti depth try: self.bit_depth = pixel_formats[fm.pixel_format] except KeyError: diff --git a/src/waifu2x_caffe.py b/src/waifu2x_caffe.py index 179d7d5..cb8cd97 100755 --- a/src/waifu2x_caffe.py +++ b/src/waifu2x_caffe.py @@ -4,13 +4,15 @@ Name: Waifu2x Caffe Driver Author: K4YT3X Date Created: Feb 24, 2018 -Last Modified: October 6, 2019 +Last Modified: February 22, 2020 Description: This class is a high-level wrapper for waifu2x-caffe. """ # built-in imports +import os +import shlex import subprocess import threading @@ -38,7 +40,7 @@ class Waifu2xCaffe: self.model_dir = model_dir self.print_lock = threading.Lock() - def upscale(self, input_directory, output_directory, scale_ratio, scale_width, scale_height, image_format, upscaler_exceptions): + def upscale(self, input_directory, output_directory, scale_ratio, scale_width, scale_height, image_format): """This is the core function for WAIFU2X class Arguments: @@ -48,51 +50,38 @@ class Waifu2xCaffe: height {int} -- output video height """ - try: - # overwrite config file settings - self.driver_settings['input_path'] = input_directory - self.driver_settings['output_path'] = output_directory + # overwrite config file settings + self.driver_settings['input_path'] = input_directory + self.driver_settings['output_path'] = output_directory - if scale_ratio: - self.driver_settings['scale_ratio'] = scale_ratio - elif scale_width and scale_height: - self.driver_settings['scale_width'] = scale_width - self.driver_settings['scale_height'] = scale_height + if scale_ratio: + self.driver_settings['scale_ratio'] = scale_ratio + elif scale_width and scale_height: + self.driver_settings['scale_width'] = scale_width + self.driver_settings['scale_height'] = scale_height - self.driver_settings['output_extention'] = image_format + self.driver_settings['output_extention'] = image_format - # print thread start message - self.print_lock.acquire() - Avalon.debug_info(f'[upscaler] Thread {threading.current_thread().name} started') - self.print_lock.release() + # list to be executed + # initialize the list with waifu2x binary path as the first element + execute = [str(self.driver_settings['path'])] - # list to be executed - # initialize the list with waifu2x binary path as the first element - execute = [str(self.driver_settings['path'])] + for key in self.driver_settings.keys(): - for key in self.driver_settings.keys(): + value = self.driver_settings[key] - value = self.driver_settings[key] - - # is executable key or null or None means that leave this option out (keep default) - if key == 'path' or value is None or value is False: - continue + # is executable key or null or None means that leave this option out (keep default) + if key == 'path' or value is None or value is False: + continue + else: + if len(key) == 1: + execute.append(f'-{key}') else: - if len(key) == 1: - execute.append(f'-{key}') - else: - execute.append(f'--{key}') - execute.append(str(value)) + execute.append(f'--{key}') + execute.append(str(value)) - Avalon.debug_info(f'Executing: {execute}') - completed_command = subprocess.run(execute, check=True) - - # print thread exiting message - self.print_lock.acquire() - Avalon.debug_info(f'[upscaler] Thread {threading.current_thread().name} exiting') - self.print_lock.release() - - # return command execution return code - return completed_command.returncode - except Exception as e: - upscaler_exceptions.append(e) + # return the Popen object of the new process created + self.print_lock.acquire() + Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {shlex.join(execute)}') + self.print_lock.release() + return subprocess.Popen(execute) diff --git a/src/waifu2x_converter.py b/src/waifu2x_converter.py index 388b69d..e1d224a 100755 --- a/src/waifu2x_converter.py +++ b/src/waifu2x_converter.py @@ -4,14 +4,16 @@ Name: Waifu2x Converter CPP Driver Author: K4YT3X Date Created: February 8, 2019 -Last Modified: October 6, 2019 +Last Modified: February 22, 2020 Description: This class is a high-level wrapper for waifu2x-converter-cpp. """ # built-in imports +import os import pathlib +import shlex import subprocess import threading @@ -33,7 +35,7 @@ class Waifu2xConverter: self.driver_settings['model_dir'] = model_dir self.print_lock = threading.Lock() - def upscale(self, input_directory, output_directory, scale_ratio, jobs, image_format, upscaler_exceptions): + def upscale(self, input_directory, output_directory, scale_ratio, jobs, image_format): """ Waifu2x Converter Driver Upscaler This method executes the upscaling of extracted frames. @@ -44,53 +46,47 @@ class Waifu2xConverter: threads {int} -- number of threads """ - try: - # overwrite config file settings - self.driver_settings['input'] = input_directory - self.driver_settings['output'] = output_directory - self.driver_settings['scale-ratio'] = scale_ratio - self.driver_settings['jobs'] = jobs - self.driver_settings['output-format'] = image_format + # overwrite config file settings + self.driver_settings['input'] = input_directory + self.driver_settings['output'] = output_directory + self.driver_settings['scale-ratio'] = scale_ratio + self.driver_settings['jobs'] = jobs + self.driver_settings['output-format'] = image_format - # models_rgb must be specified manually for waifu2x-converter-cpp - # if it's not specified in the arguments, create automatically - if self.driver_settings['model-dir'] is None: - self.driver_settings['model-dir'] = pathlib.Path(self.driver_settings['waifu2x_converter_path']) / 'models_rgb' + # models_rgb must be specified manually for waifu2x-converter-cpp + # if it's not specified in the arguments, create automatically + if self.driver_settings['model-dir'] is None: + self.driver_settings['model-dir'] = pathlib.Path(self.driver_settings['path']) / 'models_rgb' - # print thread start message - self.print_lock.acquire() - Avalon.debug_info(f'[upscaler] Thread {threading.current_thread().name} started') - self.print_lock.release() + # list to be executed + # initialize the list with waifu2x binary path as the first element + execute = [str(pathlib.Path(self.driver_settings['path']) / 'waifu2x-converter-cpp.exe')] - # list to be executed - # initialize the list with waifu2x binary path as the first element - execute = [str(pathlib.Path(self.driver_settings['path']) / 'waifu2x-converter-cpp.exe')] + for key in self.driver_settings.keys(): - for key in self.driver_settings.keys(): + value = self.driver_settings[key] - value = self.driver_settings[key] + # the key doesn't need to be passed in this case + if key == 'path': + continue - # the key doesn't need to be passed in this case - if key == 'path': - continue - - # null or None means that leave this option out (keep default) - elif value is None or value is False: - continue + # null or None means that leave this option out (keep default) + elif value is None or value is False: + continue + else: + if len(key) == 1: + execute.append(f'-{key}') else: - if len(key) == 1: - execute.append(f'-{key}') - else: - execute.append(f'--{key}') + execute.append(f'--{key}') - # true means key is an option - if value is True: - continue + # true means key is an option + if value is True: + continue - execute.append(str(value)) + execute.append(str(value)) - Avalon.debug_info(f'Executing: {execute}') - return subprocess.run(execute, check=True).returncode - - except Exception as e: - upscaler_exceptions.append(e) + # return the Popen object of the new process created + self.print_lock.acquire() + Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {shlex.join(execute)}') + self.print_lock.release() + return subprocess.Popen(execute) diff --git a/src/waifu2x_ncnn_vulkan.py b/src/waifu2x_ncnn_vulkan.py index 2664660..c3a7707 100755 --- a/src/waifu2x_ncnn_vulkan.py +++ b/src/waifu2x_ncnn_vulkan.py @@ -7,7 +7,7 @@ Date Created: June 26, 2019 Last Modified: November 15, 2019 Editor: K4YT3X -Last Modified: January 4, 2020 +Last Modified: February 22, 2020 Description: This class is a high-level wrapper for waifu2x_ncnn_vulkan. @@ -15,6 +15,7 @@ for waifu2x_ncnn_vulkan. # built-in imports import os +import shlex import subprocess import threading @@ -42,7 +43,7 @@ class Waifu2xNcnnVulkan: self.print_lock = threading.Lock() - def upscale(self, input_directory, output_directory, scale_ratio, upscaler_exceptions): + def upscale(self, input_directory, output_directory, scale_ratio): """This is the core function for WAIFU2X class Arguments: @@ -51,44 +52,31 @@ class Waifu2xNcnnVulkan: ratio {int} -- output video ratio """ - try: - # overwrite config file settings - self.driver_settings['i'] = input_directory - self.driver_settings['o'] = output_directory - self.driver_settings['s'] = int(scale_ratio) + # overwrite config file settings + self.driver_settings['i'] = input_directory + self.driver_settings['o'] = output_directory + self.driver_settings['s'] = int(scale_ratio) - # print thread start message - self.print_lock.acquire() - Avalon.debug_info(f'[upscaler] Thread {threading.current_thread().name} started') - self.print_lock.release() + # list to be executed + # initialize the list with waifu2x binary path as the first element + execute = [str(self.driver_settings['path'])] - # list to be executed - # initialize the list with waifu2x binary path as the first element - execute = [str(self.driver_settings['path'])] + for key in self.driver_settings.keys(): - for key in self.driver_settings.keys(): + value = self.driver_settings[key] - value = self.driver_settings[key] - - # is executable key or null or None means that leave this option out (keep default) - if key == 'path' or value is None or value is False: - continue + # is executable key or null or None means that leave this option out (keep default) + if key == 'path' or value is None or value is False: + continue + else: + if len(key) == 1: + execute.append(f'-{key}') else: - if len(key) == 1: - execute.append(f'-{key}') - else: - execute.append(f'--{key}') - execute.append(str(value)) + execute.append(f'--{key}') + execute.append(str(value)) - Avalon.debug_info(f'Executing: {execute}') - completed_command = subprocess.run(execute, check=True) - - # print thread exiting message - self.print_lock.acquire() - Avalon.debug_info(f'[upscaler] Thread {threading.current_thread().name} exiting') - self.print_lock.release() - - # return command execution return code - return completed_command.returncode - except Exception as e: - upscaler_exceptions.append(e) + # return the Popen object of the new process created + self.print_lock.acquire() + Avalon.debug_info(f'[upscaler] Subprocess {os.getpid()} executing: {shlex.join(execute)}') + self.print_lock.release() + return subprocess.Popen(execute)