made output extensions in batch processing customizable

This commit is contained in:
k4yt3x
2020-06-07 23:36:09 -04:00
parent 0b15fb7bd2
commit 7e87dac15e
5 changed files with 164 additions and 77 deletions

View File

@@ -4,7 +4,7 @@
Name: Video2X Upscaler
Author: K4YT3X
Date Created: December 10, 2018
Last Modified: June 5, 2020
Last Modified: June 7, 2020
Description: This file contains the Upscaler class. Each
instance of the Upscaler class is an upscaler on an image or
@@ -86,7 +86,9 @@ class Upscaler:
self.scale_ratio = None
self.processes = 1
self.video2x_cache_directory = pathlib.Path(tempfile.gettempdir()) / 'video2x'
self.image_format = 'png'
self.extracted_frame_format = 'png'
self.image_output_extension = '.png'
self.video_output_extension = '.mp4'
self.preserve_frames = False
# other internal members and signals
@@ -234,7 +236,7 @@ class Upscaler:
if self.driver == 'waifu2x_caffe':
if (driver_settings['scale_width'] != 0 and driver_settings['scale_height'] == 0 or
driver_settings['scale_width'] == 0 and driver_settings['scale_height'] != 0):
Avalon.error('Only one of scale_width and scale_height is specified for waifu2x-caffe')
Avalon.error(_('Only one of scale_width and scale_height is specified for waifu2x-caffe'))
raise AttributeError('only one of scale_width and scale_height is specified for waifu2x-caffe')
# if scale_width and scale_height are specified, ensure scale_ratio is None
@@ -324,8 +326,8 @@ class Upscaler:
# images need to be renamed to be recognizable for FFmpeg
if self.driver == 'waifu2x_converter_cpp':
for image in [f for f in self.upscaled_frames.iterdir() if f.is_file()]:
renamed = re.sub(f'_\\[.*\\]\\[x(\\d+(\\.\\d+)?)\\]\\.{self.image_format}',
f'.{self.image_format}',
renamed = re.sub(f'_\\[.*\\]\\[x(\\d+(\\.\\d+)?)\\]\\.{self.extracted_frame_format}',
f'.{self.extracted_frame_format}',
str(image.name))
(self.upscaled_frames / image).rename(self.upscaled_frames / renamed)
@@ -403,49 +405,93 @@ class Upscaler:
self.driver_object.load_configurations(self)
# initialize FFmpeg object
self.ffmpeg_object = Ffmpeg(self.ffmpeg_settings, image_format=self.image_format)
self.ffmpeg_object = Ffmpeg(self.ffmpeg_settings, extracted_frame_format=self.extracted_frame_format)
# define processing queue
self.processing_queue = queue.Queue()
Avalon.info(_('Loading files into processing queue'))
Avalon.debug_info(_('Input path(s): {}').format(self.input))
# if input is a list of files
if isinstance(self.input, list):
Avalon.info(_('Loading files from multiple paths'))
Avalon.debug_info(_('Input path(s): {}').format(self.input))
# make output directory if it doesn't exist
# make output directory if the input is a list or a directory
if isinstance(self.input, list) or self.input.is_dir():
self.output.mkdir(parents=True, exist_ok=True)
for input_path in self.input:
input_files = []
if input_path.is_file():
output_path = self.output / input_path.name
self.processing_queue.put((input_path.absolute(), output_path.absolute()))
# if input is single directory
# put it in a list for compability with the following code
if not isinstance(self.input, list):
input_paths = [self.input]
else:
input_paths = self.input
elif input_path.is_dir():
for input_path in [f for f in input_path.iterdir() if f.is_file()]:
output_path = self.output / input_path.name
self.processing_queue.put((input_path.absolute(), output_path.absolute()))
# flatten directories into file paths
for input_path in input_paths:
# if input specified is single file
elif self.input.is_file():
Avalon.info(_('Loading single file'))
Avalon.debug_info(_('Input path(s): {}').format(self.input))
self.processing_queue.put((self.input.absolute(), self.output.absolute()))
# if the input path is a single file
# add the file's path object to input_files
if input_path.is_file():
input_files.append(input_path)
# if input specified is a directory
elif self.input.is_dir():
# if the input path is a directory
# add all files under the directory into the input_files (non-recursive)
elif input_path.is_dir():
input_files.extend([f for f in input_path.iterdir() if f.is_file()])
Avalon.info(_('Loading files from directory'))
Avalon.debug_info(_('Input path(s): {}').format(self.input))
# make output directory if it doesn't exist
self.output.mkdir(parents=True, exist_ok=True)
for input_path in [f for f in self.input.iterdir() if f.is_file()]:
output_path = self.output / input_path.name
self.processing_queue.put((input_path.absolute(), output_path.absolute()))
output_paths = []
for input_path in input_files:
# get file type
# try python-magic if it's available
try:
input_file_mime_type = magic.from_file(str(input_path.absolute()), mime=True)
input_file_type = input_file_mime_type.split('/')[0]
input_file_subtype = input_file_mime_type.split('/')[1]
except Exception:
input_file_type = input_file_subtype = None
# in case python-magic fails to detect file type
# try guessing file mime type with mimetypes
if input_file_type not in ['image', 'video']:
input_file_mime_type = mimetypes.guess_type(input_path.name)[0]
input_file_type = input_file_mime_type.split('/')[0]
input_file_subtype = input_file_mime_type.split('/')[1]
# set default output file suffixes
# if image type is GIF, default output suffix is also .gif
if input_file_mime_type == 'image/gif':
output_path = self.output / (input_path.stem + '.gif')
elif input_file_type == 'image':
output_path = self.output / (input_path.stem + self.image_output_extension)
elif input_file_type == 'video':
output_path = self.output / (input_path.stem + self.video_output_extension)
# if file is none of: image, image/gif, video
# skip to the next task
else:
Avalon.error(_('File {} ({}) neither an image nor a video').format(input_path, input_file_mime_type))
Avalon.warning(_('Skipping this file'))
continue
# if there is only one input file
# do not modify output file suffix
if isinstance(self.input, pathlib.Path) and self.input.is_file():
output_path = self.output
output_path_id = 0
while str(output_path) in output_paths:
output_path = output_path.parent / pathlib.Path(f'{output_path.stem}_{output_path_id}{output_path.suffix}')
output_path_id += 1
# record output path
output_paths.append(str(output_path))
# push file information into processing queue
self.processing_queue.put((input_path.absolute(), output_path.absolute(), input_file_mime_type, input_file_type, input_file_subtype))
# check argument sanity before running
self._check_arguments()
@@ -461,27 +507,8 @@ class Upscaler:
try:
while not self.processing_queue.empty():
# reset current processing progress for new job
self.total_frames_upscaled = 0
self.total_frames = 0
# get new job from queue
self.current_input_file, output_path = self.processing_queue.get()
# get file type
try:
input_file_mime_type = magic.from_file(str(self.current_input_file.absolute()), mime=True)
input_file_type = input_file_mime_type.split('/')[0]
input_file_subtype = input_file_mime_type.split('/')[1]
except Exception:
input_file_type = input_file_subtype = None
# in case python-magic fails to detect file type
# try guessing file mime type with mimetypes
if input_file_type not in ['image', 'video']:
input_file_mime_type = mimetypes.guess_type(self.current_input_file.name)[0]
input_file_type = input_file_mime_type.split('/')[0]
input_file_subtype = input_file_mime_type.split('/')[1]
self.current_input_file, output_path, input_file_mime_type, input_file_type, input_file_subtype = self.processing_queue.get()
# start handling input
# if input file is a static image
@@ -553,15 +580,6 @@ class Upscaler:
self._upscale_frames()
Avalon.info(_('Upscaling completed'))
# if file is none of: image, image/gif, video
# skip to the next task
else:
Avalon.error(_('File {} ({}) neither an image nor a video').format(self.current_input_file, input_file_mime_type))
Avalon.warning(_('Skipping this file'))
self.processing_queue.task_done()
self.total_processed += 1
continue
# start handling output
# output can be either GIF or video
@@ -569,7 +587,7 @@ class Upscaler:
if output_path.suffix.lower() == '.gif':
Avalon.info(_('Converting extracted frames into GIF image'))
gifski_object = Gifski(self.gifski_settings)
self.process_pool.append(gifski_object.make_gif(self.upscaled_frames, output_path, framerate, self.image_format))
self.process_pool.append(gifski_object.make_gif(self.upscaled_frames, output_path, framerate, self.extracted_frame_format))
self._wait()
Avalon.info(_('Conversion completed'))