diff --git a/video2x/video2x.py b/video2x/video2x.py
index a24c063..c0cd4a3 100755
--- a/video2x/video2x.py
+++ b/video2x/video2x.py
@@ -27,7 +27,7 @@ __ __ _ _ ___ __ __
Name: Video2X
Creator: K4YT3X
Date Created: February 24, 2018
-Last Modified: April 5, 2022
+Last Modified: April 30, 2022
Editor: BrianPetkovsek
Last Modified: June 17, 2019
@@ -39,20 +39,18 @@ Editor: 28598519a
Last Modified: March 23, 2020
"""
-import argparse
import ctypes
import math
-import multiprocessing
-import os
-import pathlib
import signal
import sys
import time
+from enum import Enum
+from multiprocessing import Manager, Pool, Queue, Value
+from pathlib import Path
import ffmpeg
from cv2 import cv2
from loguru import logger
-from rich import print as rich_print
from rich.console import Console
from rich.file_proxy import FileProxy
from rich.progress import (
@@ -65,41 +63,23 @@ from rich.progress import (
)
from rich.text import Text
+from video2x.processor import Processor
+
from . import __version__
-from .decoder import VideoDecoder
+from .decoder import VideoDecoder, VideoDecoderThread
from .encoder import VideoEncoder
from .interpolator import Interpolator
-from .upscaler import Upscaler
+from .upscaler import UpscalerProcessor
# for desktop environments only
# if pynput can be loaded, enable global pause hotkey support
try:
- import pynput
+ from pynput.keyboard import HotKey, Listener
except ImportError:
ENABLE_HOTKEY = False
else:
ENABLE_HOTKEY = True
-LEGAL_INFO = f"""Video2X\t\t{__version__}
-Author:\t\tK4YT3X
-License:\tGNU AGPL v3
-Github Page:\thttps://github.com/k4yt3x/video2x
-Contact:\ti@k4yt3x.com"""
-
-# algorithms available for upscaling tasks
-UPSCALING_ALGORITHMS = [
- "waifu2x",
- "srmd",
- "realsr",
- "realcugan",
-]
-
-# algorithms available for frame interpolation tasks
-INTERPOLATION_ALGORITHMS = ["rife"]
-
-# progress bar labels for different modes
-MODE_LABELS = {"upscale": "Upscaling", "interpolate": "Interpolating"}
-
# format string for Loguru loggers
LOGURU_FORMAT = (
"{time:HH:mm:ss.SSSSSS!UTC} | "
@@ -119,6 +99,11 @@ class ProcessingSpeedColumn(ProgressColumn):
)
+class ProcessingMode(Enum):
+ UPSCALE = {"label": "Upscaling", "processor": UpscalerProcessor}
+ INTERPOLATE = {"label": "Interpolating", "processor": Interpolator}
+
+
class Video2X:
"""
Video2X class
@@ -132,11 +117,11 @@ class Video2X:
self.version = __version__
@staticmethod
- def _get_video_info(path: pathlib.Path) -> tuple:
+ def _get_video_info(path: Path) -> tuple:
"""
get video file information with FFmpeg
- :param path pathlib.Path: video file path
+ :param path Path: video file path
:raises RuntimeError: raised when video stream isn't found
"""
# probe video file info
@@ -160,34 +145,17 @@ class Video2X:
return video_info["width"], video_info["height"], total_frames, frame_rate
- def _toggle_pause(self, _signal_number: int = -1, _frame=None):
- # print console messages and update the progress bar's status
- if self.pause.value is False:
- self.progress.update(self.task, description=self.description + " (paused)")
- self.progress.stop_task(self.task)
- logger.warning("Processing paused, press Ctrl+Alt+V again to resume")
-
- elif self.pause.value is True:
- self.progress.update(self.task, description=self.description)
- logger.warning("Resuming processing")
- self.progress.start_task(self.task)
-
- # invert the value of the pause flag
- with self.pause.get_lock():
- self.pause.value = not self.pause.value
-
def _run(
self,
- input_path: pathlib.Path,
+ input_path: Path,
width: int,
height: int,
total_frames: int,
frame_rate: float,
- output_path: pathlib.Path,
+ output_path: Path,
output_width: int,
output_height: int,
- Processor: object,
- mode: str,
+ mode: ProcessingMode,
processes: int,
processing_settings: tuple,
) -> None:
@@ -207,51 +175,40 @@ class Video2X:
logger.remove()
logger.add(sys.stderr, colorize=True, format=LOGURU_FORMAT)
- # initialize values
- self.processor_processes = []
- self.processing_queue = multiprocessing.Queue(maxsize=processes * 10)
- processed_frames = multiprocessing.Manager().list([None] * total_frames)
- self.processed = multiprocessing.Value("I", 0)
- self.pause = multiprocessing.Value(ctypes.c_bool, False)
+ # TODO: add docs
+ tasks_queue = Queue(maxsize=processes * 10)
+ processed_frames = Manager().dict()
+ pause_flag = Value(ctypes.c_bool, False)
# set up and start decoder thread
logger.info("Starting video decoder")
- self.decoder = VideoDecoder(
+ decoder = VideoDecoder(
input_path,
width,
height,
frame_rate,
- self.processing_queue,
- processing_settings,
- self.pause,
)
- self.decoder.start()
+ decoder_thread = VideoDecoderThread(tasks_queue, decoder, processing_settings)
+ decoder_thread.start()
# set up and start encoder thread
logger.info("Starting video encoder")
- self.encoder = VideoEncoder(
+ encoder = VideoEncoder(
input_path,
frame_rate * 2 if mode == "interpolate" else frame_rate,
output_path,
output_width,
output_height,
- total_frames,
- processed_frames,
- self.processed,
- self.pause,
)
- self.encoder.start()
- # create processor processes
- for process_name in range(processes):
- process = Processor(self.processing_queue, processed_frames, self.pause)
- process.name = str(process_name)
- process.daemon = True
- process.start()
- self.processor_processes.append(process)
+ # create a pool of processor processes to process the queue
+ processor: Processor = mode.value["processor"](
+ tasks_queue, processed_frames, pause_flag
+ )
+ processor_pool = Pool(processes, processor.process)
# create progress bar
- self.progress = Progress(
+ progress = Progress(
"[progress.description]{task.description}",
BarColumn(complete_style="blue", finished_style="green"),
"[progress.percentage]{task.percentage:>3.0f}%",
@@ -264,23 +221,42 @@ class Video2X:
speed_estimate_period=300.0,
disable=True,
)
+ task = progress.add_task(f"[cyan]{mode.value['label']}", total=total_frames)
- self.description = f"[cyan]{MODE_LABELS.get(mode, 'Unknown')}"
- self.task = self.progress.add_task(self.description, total=total_frames)
+ def _toggle_pause(_signal_number: int = -1, _frame=None):
+
+ # allow the closure to modify external immutable flag
+ nonlocal pause_flag
+
+ # print console messages and update the progress bar's status
+ if pause_flag.value is False:
+ progress.update(
+ task, description=f"[cyan]{mode.value['label']} (paused)"
+ )
+ progress.stop_task(task)
+ logger.warning("Processing paused, press Ctrl+Alt+V again to resume")
+
+ # the lock is already acquired
+ elif pause_flag.value is True:
+ progress.update(task, description=f"[cyan]{mode.value['label']}")
+ logger.warning("Resuming processing")
+ progress.start_task(task)
+
+ # invert the flag
+ with pause_flag.get_lock():
+ pause_flag.value = not pause_flag.value
# allow sending SIGUSR1 to pause/resume processing
- signal.signal(signal.SIGUSR1, self._toggle_pause)
+ signal.signal(signal.SIGUSR1, _toggle_pause)
# enable global pause hotkey if it's supported
if ENABLE_HOTKEY is True:
# create global pause hotkey
- pause_hotkey = pynput.keyboard.HotKey(
- pynput.keyboard.HotKey.parse("++v"), self._toggle_pause
- )
+ pause_hotkey = HotKey(HotKey.parse("++v"), _toggle_pause)
# create global keyboard input listener
- keyboard_listener = pynput.keyboard.Listener(
+ keyboard_listener = Listener(
on_press=(
lambda key: pause_hotkey.press(keyboard_listener.canonical(key))
),
@@ -293,51 +269,52 @@ class Video2X:
keyboard_listener.start()
# a temporary variable that stores the exception
- exception = []
+ exceptions = []
try:
- # wait for jobs in queue to deplete
- while self.processed.value < total_frames - 1:
- time.sleep(1)
+ # let the context manager automatically stop the progress bar
+ with progress:
- # check processor health
- for process in self.processor_processes:
- if not process.is_alive():
- raise Exception("process died unexpectedly")
+ frame_index = 0
+ while frame_index < total_frames:
- # check decoder health
- if not self.decoder.is_alive() and self.decoder.exception is not None:
- raise Exception("decoder died unexpectedly")
+ current_frame = processed_frames.get(frame_index)
- # check encoder health
- if not self.encoder.is_alive() and self.encoder.exception is not None:
- raise Exception("encoder died unexpectedly")
+ if pause_flag.value is True or current_frame is None:
+ time.sleep(0.1)
+ continue
- # show progress bar when upscale starts
- if self.progress.disable is True and self.processed.value > 0:
- self.progress.disable = False
- self.progress.start()
+ # show the progress bar after the processing starts
+ # reduces speed estimation inaccuracies and print overlaps
+ if frame_index == 0:
+ progress.disable = False
+ progress.start()
- # update progress
- if self.pause.value is False:
- self.progress.update(self.task, completed=self.processed.value)
+ if current_frame is True:
+ encoder.write(processed_frames.get(frame_index - 1))
- self.progress.update(self.task, completed=total_frames)
- self.progress.stop()
- logger.info("Processing has completed")
+ else:
+ encoder.write(current_frame)
+
+ if frame_index > 0:
+ del processed_frames[frame_index - 1]
+
+ progress.update(task, completed=frame_index + 1)
+ frame_index += 1
# if SIGTERM is received or ^C is pressed
except (SystemExit, KeyboardInterrupt) as error:
- self.progress.stop()
logger.warning("Exit signal received, exiting gracefully")
logger.warning("Press ^C again to force terminate")
- exception.append(error)
+ exceptions.append(error)
except Exception as error:
- self.progress.stop()
logger.exception(error)
- exception.append(error)
+ exceptions.append(error)
+
+ else:
+ logger.info("Processing has completed")
finally:
@@ -346,31 +323,30 @@ class Video2X:
keyboard_listener.stop()
keyboard_listener.join()
- # stop progress display
- self.progress.stop()
+ # if errors have occurred, kill the FFmpeg processes
+ if len(exceptions) > 0:
+ decoder.kill()
+ encoder.kill()
- # stop processor processes
- logger.info("Stopping processor processes")
- for process in self.processor_processes:
- process.terminate()
+ # stop the decoder
+ decoder_thread.stop()
+ decoder_thread.join()
- # wait for processes to finish
- for process in self.processor_processes:
- process.join()
+ # stop the encoder
+ encoder.join()
- # stop encoder and decoder
- logger.info("Stopping decoder and encoder threads")
- self.decoder.stop()
- self.encoder.stop()
- self.decoder.join()
- self.encoder.join()
+ logger.critical("ENCODER")
- # mark processing queue as closed
- self.processing_queue.close()
+ # clear queue and signal processors to exit
+ # multiprocessing.Queue has no Queue.queue.clear
+ while tasks_queue.empty() is not True:
+ tasks_queue.get()
+ for _ in range(processes):
+ tasks_queue.put(None)
- # raise the error if there is any
- if len(exception) > 0:
- raise exception[0]
+ # close and join the process pool
+ processor_pool.close()
+ processor_pool.join()
# restore original STDOUT and STDERR
sys.stdout = original_stdout
@@ -380,10 +356,14 @@ class Video2X:
logger.remove()
logger.add(sys.stderr, colorize=True, format=LOGURU_FORMAT)
+ # raise the first collected error
+ if len(exceptions) > 0:
+ raise exceptions[0]
+
def upscale(
self,
- input_path: pathlib.Path,
- output_path: pathlib.Path,
+ input_path: Path,
+ output_path: Path,
output_width: int,
output_height: int,
noise: int,
@@ -416,22 +396,21 @@ class Video2X:
output_path,
output_width,
output_height,
- Upscaler,
- "upscale",
+ ProcessingMode.UPSCALE,
processes,
(
output_width,
output_height,
+ algorithm,
noise,
threshold,
- algorithm,
),
)
def interpolate(
self,
- input_path: pathlib.Path,
- output_path: pathlib.Path,
+ input_path: Path,
+ output_path: Path,
processes: int,
threshold: float,
algorithm: str,
@@ -453,192 +432,7 @@ class Video2X:
output_path,
width,
height,
- Interpolator,
- "interpolate",
+ ProcessingMode.INTERPOLATE,
processes,
(threshold, algorithm),
)
-
-
-def parse_arguments() -> argparse.Namespace:
- """
- parse command line arguments
-
- :rtype argparse.Namespace: command parsing results
- """
- parser = argparse.ArgumentParser(
- prog="video2x",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter,
- )
- parser.add_argument(
- "--version", help="show version information and exit", action="store_true"
- )
- parser.add_argument(
- "-i",
- "--input",
- type=pathlib.Path,
- help="input file/directory path",
- required=True,
- )
- parser.add_argument(
- "-o",
- "--output",
- type=pathlib.Path,
- help="output file/directory path",
- required=True,
- )
- parser.add_argument(
- "-p", "--processes", type=int, help="number of processes to launch", default=1
- )
- parser.add_argument(
- "-l",
- "--loglevel",
- choices=["trace", "debug", "info", "success", "warning", "error", "critical"],
- default="info",
- )
-
- # upscaler arguments
- action = parser.add_subparsers(
- help="action to perform", dest="action", required=True
- )
-
- upscale = action.add_parser(
- "upscale",
- help="upscale a file",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter,
- add_help=False,
- )
- upscale.add_argument(
- "--help", action="help", help="show this help message and exit"
- )
- upscale.add_argument("-w", "--width", type=int, help="output width")
- upscale.add_argument("-h", "--height", type=int, help="output height")
- upscale.add_argument("-n", "--noise", type=int, help="denoise level", default=3)
- upscale.add_argument(
- "-a",
- "--algorithm",
- choices=UPSCALING_ALGORITHMS,
- help="algorithm to use for upscaling",
- default=UPSCALING_ALGORITHMS[0],
- )
- upscale.add_argument(
- "-t",
- "--threshold",
- type=float,
- help=(
- "skip if the percent difference between two adjacent frames is below this"
- " value; set to 0 to process all frames"
- ),
- default=0,
- )
-
- # interpolator arguments
- interpolate = action.add_parser(
- "interpolate",
- help="interpolate frames for file",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter,
- add_help=False,
- )
- interpolate.add_argument(
- "--help", action="help", help="show this help message and exit"
- )
- interpolate.add_argument(
- "-a",
- "--algorithm",
- choices=UPSCALING_ALGORITHMS,
- help="algorithm to use for upscaling",
- default=INTERPOLATION_ALGORITHMS[0],
- )
- interpolate.add_argument(
- "-t",
- "--threshold",
- type=float,
- help=(
- "skip if the percent difference between two adjacent frames exceeds this"
- " value; set to 100 to interpolate all frames"
- ),
- default=10,
- )
-
- return parser.parse_args()
-
-
-def main() -> int:
- """
- command line entrypoint for direct CLI invocation
-
- :rtype int: 0 if completed successfully, else other int
- """
-
- try:
- # display version and lawful informaition
- if "--version" in sys.argv:
- rich_print(LEGAL_INFO)
- return 0
-
- # parse command line arguments
- args = parse_arguments()
-
- # check input/output file paths
- if not args.input.exists():
- logger.critical(f"Cannot find input file: {args.input}")
- return 1
- if not args.input.is_file():
- logger.critical("Input path is not a file")
- return 1
- if not args.output.parent.exists():
- logger.critical(f"Output directory does not exist: {args.output.parent}")
- return 1
-
- # set logger level
- if os.environ.get("LOGURU_LEVEL") is None:
- os.environ["LOGURU_LEVEL"] = args.loglevel.upper()
-
- # remove default handler
- logger.remove()
-
- # add new sink with custom handler
- logger.add(sys.stderr, colorize=True, format=LOGURU_FORMAT)
-
- # print package version and copyright notice
- logger.opt(colors=True).info(f"Video2X {__version__}")
- logger.opt(colors=True).info(
- "Copyright (C) 2018-2022 K4YT3X and contributors."
- )
-
- # initialize video2x object
- video2x = Video2X()
-
- if args.action == "upscale":
- video2x.upscale(
- args.input,
- args.output,
- args.width,
- args.height,
- args.noise,
- args.processes,
- args.threshold,
- args.algorithm,
- )
-
- elif args.action == "interpolate":
- video2x.interpolate(
- args.input,
- args.output,
- args.processes,
- args.threshold,
- args.algorithm,
- )
-
- # don't print the traceback for manual terminations
- except KeyboardInterrupt:
- return 2
-
- except Exception as error:
- logger.exception(error)
- return 1
-
- # if no exceptions were produced
- else:
- logger.success("Processing completed successfully")
- return 0