新增视频场景检测

This commit is contained in:
YaoFANGUK
2023-12-12 17:06:05 +08:00
parent 1cb7705660
commit 6d741cd9e4
27 changed files with 9139 additions and 0 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,548 @@
# -*- coding: utf-8 -*-
#
# PySceneDetect: Python-Based Video Scene Detector
# -------------------------------------------------------------------
# [ Site: https://scenedetect.com ]
# [ Docs: https://scenedetect.com/docs/ ]
# [ Github: https://github.com/Breakthrough/PySceneDetect/ ]
#
# Copyright (C) 2014-2023 Brandon Castellano <http://www.bcastell.com>.
# PySceneDetect is licensed under the BSD 3-Clause License; see the
# included LICENSE file, or visit one of the above pages for details.
#
"""Handles loading configuration files from disk and validating each section. Only validation of the
config file schema and data types are performed. Constants/defaults are also defined here where
possible and re-used by the CLI so that there is one source of truth.
"""
from abc import ABC, abstractmethod
import logging
import os
import os.path
from configparser import ConfigParser, ParsingError
from typing import Any, AnyStr, Dict, List, Optional, Tuple, Union
from platformdirs import user_config_dir
from scenedetect.detectors import ContentDetector
from scenedetect.frame_timecode import FrameTimecode
from scenedetect.scene_manager import Interpolation
from scenedetect.video_splitter import DEFAULT_FFMPEG_ARGS
VALID_PYAV_THREAD_MODES = ['NONE', 'SLICE', 'FRAME', 'AUTO']
class OptionParseFailure(Exception):
"""Raised when a value provided in a user config file fails validation."""
def __init__(self, error):
super().__init__()
self.error = error
class ValidatedValue(ABC):
"""Used to represent configuration values that must be validated against constraints."""
@property
@abstractmethod
def value(self) -> Any:
"""Get the value after validation."""
raise NotImplementedError()
@staticmethod
@abstractmethod
def from_config(config_value: str, default: 'ValidatedValue') -> 'ValidatedValue':
"""Validate and get the user-specified configuration option.
Raises:
OptionParseFailure: Value from config file did not meet validation constraints.
"""
raise NotImplementedError()
class TimecodeValue(ValidatedValue):
"""Validator for timecode values in frames (1234), seconds (123.4s), or HH:MM:SS.
Stores value in original representation."""
def __init__(self, value: Union[int, float, str]):
# Ensure value is a valid timecode.
FrameTimecode(timecode=value, fps=100.0)
self._value = value
@property
def value(self) -> Union[int, float, str]:
return self._value
def __repr__(self) -> str:
return str(self.value)
def __str__(self) -> str:
return str(self.value)
@staticmethod
def from_config(config_value: str, default: 'TimecodeValue') -> 'TimecodeValue':
try:
return TimecodeValue(config_value)
except ValueError as ex:
raise OptionParseFailure(
'Timecodes must be in frames (1234), seconds (123.4s), or HH:MM:SS (00:02:03.400).'
) from ex
class RangeValue(ValidatedValue):
"""Validator for int/float ranges. `min_val` and `max_val` are inclusive."""
def __init__(
self,
value: Union[int, float],
min_val: Union[int, float],
max_val: Union[int, float],
):
if value < min_val or value > max_val:
# min and max are inclusive.
raise ValueError()
self._value = value
self._min_val = min_val
self._max_val = max_val
@property
def value(self) -> Union[int, float]:
return self._value
@property
def min_val(self) -> Union[int, float]:
"""Minimum value of the range."""
return self._min_val
@property
def max_val(self) -> Union[int, float]:
"""Maximum value of the range."""
return self._max_val
def __repr__(self) -> str:
return str(self.value)
def __str__(self) -> str:
return str(self.value)
@staticmethod
def from_config(config_value: str, default: 'RangeValue') -> 'RangeValue':
try:
return RangeValue(
value=int(config_value) if isinstance(default.value, int) else float(config_value),
min_val=default.min_val,
max_val=default.max_val,
)
except ValueError as ex:
raise OptionParseFailure('Value must be between %s and %s.' %
(default.min_val, default.max_val)) from ex
class ScoreWeightsValue(ValidatedValue):
"""Validator for score weight values (currently a tuple of four numbers)."""
_IGNORE_CHARS = [',', '/', '(', ')']
"""Characters to ignore."""
def __init__(self, value: Union[str, ContentDetector.Components]):
if isinstance(value, ContentDetector.Components):
self._value = value
else:
translation_table = str.maketrans(
{char: ' ' for char in ScoreWeightsValue._IGNORE_CHARS})
values = value.translate(translation_table).split()
if not len(values) == 4:
raise ValueError("Score weights must be specified as four numbers!")
self._value = ContentDetector.Components(*(float(val) for val in values))
@property
def value(self) -> Tuple[float, float, float, float]:
return self._value
def __repr__(self) -> str:
return str(self.value)
def __str__(self) -> str:
return '%.3f, %.3f, %.3f, %.3f' % self.value
@staticmethod
def from_config(config_value: str, default: 'ScoreWeightsValue') -> 'ScoreWeightsValue':
try:
return ScoreWeightsValue(config_value)
except ValueError as ex:
raise OptionParseFailure(
'Score weights must be specified as four numbers in the form (H,S,L,E),'
' e.g. (0.9, 0.2, 2.0, 0.5). Commas/brackets/slashes are ignored.') from ex
class KernelSizeValue(ValidatedValue):
"""Validator for kernel sizes (odd integer > 1, or -1 for auto size)."""
def __init__(self, value: int):
if value == -1:
# Downscale factor of -1 maps to None internally for auto downscale.
value = None
elif value < 0:
# Disallow other negative values.
raise ValueError()
elif value % 2 == 0:
# Disallow even values.
raise ValueError()
self._value = value
@property
def value(self) -> int:
return self._value
def __repr__(self) -> str:
return str(self.value)
def __str__(self) -> str:
if self.value is None:
return 'auto'
return str(self.value)
@staticmethod
def from_config(config_value: str, default: 'KernelSizeValue') -> 'KernelSizeValue':
try:
return KernelSizeValue(int(config_value))
except ValueError as ex:
raise OptionParseFailure(
'Value must be an odd integer greater than 1, or set to -1 for auto kernel size.'
) from ex
ConfigValue = Union[bool, int, float, str]
ConfigDict = Dict[str, Dict[str, ConfigValue]]
_CONFIG_FILE_NAME: AnyStr = 'scenedetect.cfg'
_CONFIG_FILE_DIR: AnyStr = user_config_dir("PySceneDetect", False)
CONFIG_FILE_PATH: AnyStr = os.path.join(_CONFIG_FILE_DIR, _CONFIG_FILE_NAME)
CONFIG_MAP: ConfigDict = {
'backend-opencv': {
'max-decode-attempts': 5,
},
'backend-pyav': {
'suppress-output': False,
'threading-mode': 'auto',
},
'scene_detect-adaptive': {
'frame-window': 2,
'kernel-size': KernelSizeValue(-1),
'luma-only': False,
'min-content-val': RangeValue(15.0, min_val=0.0, max_val=255.0),
'min-scene-len': TimecodeValue(0),
'threshold': RangeValue(3.0, min_val=0.0, max_val=255.0),
'weights': ScoreWeightsValue(ContentDetector.DEFAULT_COMPONENT_WEIGHTS),
# TODO(v0.7): Remove `min-delta-hsv``.
'min-delta-hsv': RangeValue(15.0, min_val=0.0, max_val=255.0),
},
'scene_detect-content': {
'kernel-size': KernelSizeValue(-1),
'luma-only': False,
'min-scene-len': TimecodeValue(0),
'threshold': RangeValue(27.0, min_val=0.0, max_val=255.0),
'weights': ScoreWeightsValue(ContentDetector.DEFAULT_COMPONENT_WEIGHTS),
},
'scene_detect-threshold': {
'add-last-scene': True,
'fade-bias': RangeValue(0, min_val=-100.0, max_val=100.0),
'min-scene-len': TimecodeValue(0),
'threshold': RangeValue(12.0, min_val=0.0, max_val=255.0),
},
'load-scenes': {
'start-col-name': 'Start Frame',
},
'export-html': {
'filename': '$VIDEO_NAME-Scenes.html',
'image-height': 0,
'image-width': 0,
'no-images': False,
},
'list-scenes': {
'output': '',
'filename': '$VIDEO_NAME-Scenes.csv',
'no-output-file': False,
'quiet': False,
'skip-cuts': False,
},
'global': {
'backend': 'opencv',
'default-detector': 'scene_detect-adaptive',
'downscale': 0,
'downscale-method': 'linear',
'drop-short-scenes': False,
'frame-skip': 0,
'merge-last-scene': False,
'min-scene-len': TimecodeValue('0.6s'),
'output': '',
'verbosity': 'info',
},
'save-images': {
'compression': RangeValue(3, min_val=0, max_val=9),
'filename': '$VIDEO_NAME-Scene-$SCENE_NUMBER-$IMAGE_NUMBER',
'format': 'jpeg',
'frame-margin': 1,
'height': 0,
'num-images': 3,
'output': '',
'quality': RangeValue(0, min_val=0, max_val=100), # Default depends on format
'scale': 1.0,
'scale-method': 'linear',
'width': 0,
},
'split-video': {
'args': DEFAULT_FFMPEG_ARGS,
'copy': False,
'filename': '$VIDEO_NAME-Scene-$SCENE_NUMBER',
'high-quality': False,
'mkvmerge': False,
'output': '',
'preset': 'veryfast',
'quiet': False,
'rate-factor': RangeValue(22, min_val=0, max_val=100),
},
}
"""Mapping of valid configuration file parameters and their default values or placeholders.
The types of these values are used when decoding the configuration file. Valid choices for
certain string options are stored in `CHOICE_MAP`."""
CHOICE_MAP: Dict[str, Dict[str, List[str]]] = {
'global': {
'backend': ['opencv', 'pyav', 'moviepy'],
'default-detector': ['scene_detect-adaptive', 'scene_detect-content', 'scene_detect-threshold'],
'downscale-method': [value.name.lower() for value in Interpolation],
'verbosity': ['debug', 'info', 'warning', 'error', 'none'],
},
'split-video': {
'preset': [
'ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower',
'veryslow'
],
},
'save-images': {
'format': ['jpeg', 'png', 'webp'],
'scale-method': [value.name.lower() for value in Interpolation],
},
'backend-pyav': {
'threading_mode': [str(mode).lower() for mode in VALID_PYAV_THREAD_MODES],
},
}
"""Mapping of string options which can only be of a particular set of values. We use a list instead
of a set to preserve order when generating error contexts. Values are case-insensitive, and must be
in lowercase in this map."""
def _validate_structure(config: ConfigParser) -> List[str]:
"""Validates the layout of the section/option mapping.
Returns:
List of any parsing errors in human-readable form.
"""
errors: List[str] = []
for section in config.sections():
if not section in CONFIG_MAP.keys():
errors.append('Unsupported config section: [%s]' % (section))
continue
for (option_name, _) in config.items(section):
if not option_name in CONFIG_MAP[section].keys():
errors.append('Unsupported config option in [%s]: %s' % (section, option_name))
return errors
def _parse_config(config: ConfigParser) -> Tuple[ConfigDict, List[str]]:
"""Process the given configuration into a key-value mapping.
Returns:
Configuration mapping and list of any processing errors in human readable form.
"""
out_map: ConfigDict = {}
errors: List[str] = []
for command in CONFIG_MAP:
out_map[command] = {}
for option in CONFIG_MAP[command]:
if command in config and option in config[command]:
try:
value_type = None
if isinstance(CONFIG_MAP[command][option], bool):
value_type = 'yes/no value'
out_map[command][option] = config.getboolean(command, option)
continue
elif isinstance(CONFIG_MAP[command][option], int):
value_type = 'integer'
out_map[command][option] = config.getint(command, option)
continue
elif isinstance(CONFIG_MAP[command][option], float):
value_type = 'number'
out_map[command][option] = config.getfloat(command, option)
continue
except ValueError as _:
errors.append('Invalid [%s] value for %s: %s is not a valid %s.' %
(command, option, config.get(command, option), value_type))
continue
# Handle custom validation types.
config_value = config.get(command, option)
default = CONFIG_MAP[command][option]
option_type = type(default)
if issubclass(option_type, ValidatedValue):
try:
out_map[command][option] = option_type.from_config(
config_value=config_value, default=default)
except OptionParseFailure as ex:
errors.append('Invalid [%s] value for %s:\n %s\n%s' %
(command, option, config_value, ex.error))
continue
# If we didn't process the value as a given type, handle it as a string. We also
# replace newlines with spaces, and strip any remaining leading/trailing whitespace.
if value_type is None:
config_value = config.get(command, option).replace('\n', ' ').strip()
if command in CHOICE_MAP and option in CHOICE_MAP[command]:
if config_value.lower() not in CHOICE_MAP[command][option]:
errors.append('Invalid [%s] value for %s: %s. Must be one of: %s.' %
(command, option, config.get(command, option), ', '.join(
choice for choice in CHOICE_MAP[command][option])))
continue
out_map[command][option] = config_value
continue
return (out_map, errors)
class ConfigLoadFailure(Exception):
"""Raised when a user-specified configuration file fails to be loaded or validated."""
def __init__(self, init_log: Tuple[int, str], reason: Optional[Exception] = None):
super().__init__()
self.init_log = init_log
self.reason = reason
class ConfigRegistry:
def __init__(self, path: Optional[str] = None, throw_exception: bool = True):
self._config: ConfigDict = {} # Options set in the loaded config file.
self._init_log: List[Tuple[int, str]] = []
self._initialized = False
try:
self._load_from_disk(path)
self._initialized = True
except ConfigLoadFailure as ex:
if throw_exception:
raise
# If we fail to load the user config file, ensure the object is flagged as
# uninitialized, and log the error so it can be dealt with if necessary.
self._init_log = ex.init_log
if ex.reason is not None:
self._init_log += [
(logging.ERROR, 'Error: %s' % str(ex.reason).replace('\t', ' ')),
]
self._initialized = False
@property
def config_dict(self) -> ConfigDict:
"""Current configuration options that are set for each command."""
return self._config
@property
def initialized(self) -> bool:
"""True if the ConfigRegistry was constructed without errors, False otherwise."""
return self._initialized
def get_init_log(self):
"""Get initialization log. Consumes the log, so subsequent calls will return None."""
init_log = self._init_log
self._init_log = []
return init_log
def _log(self, log_level, log_str):
self._init_log.append((log_level, log_str))
def _load_from_disk(self, path=None):
# Validate `path`, or if not provided, use CONFIG_FILE_PATH if it exists.
if path:
self._init_log.append((logging.INFO, "Loading config from file:\n %s" % path))
if not os.path.exists(path):
self._init_log.append((logging.ERROR, "File not found: %s" % (path)))
raise ConfigLoadFailure(self._init_log)
else:
# Gracefully handle the case where there isn't a user config file.
if not os.path.exists(CONFIG_FILE_PATH):
self._init_log.append((logging.DEBUG, "User config file not found."))
return
path = CONFIG_FILE_PATH
self._init_log.append((logging.INFO, "Loading user config file:\n %s" % path))
# Try to load and parse the config file at `path`.
config = ConfigParser()
try:
with open(path, 'r') as config_file:
config_file_contents = config_file.read()
config.read_string(config_file_contents, source=path)
except ParsingError as ex:
raise ConfigLoadFailure(self._init_log, reason=ex)
except OSError as ex:
raise ConfigLoadFailure(self._init_log, reason=ex)
# At this point the config file syntax is correct, but we need to still validate
# the parsed options (i.e. that the options have valid values).
errors = _validate_structure(config)
if not errors:
self._config, errors = _parse_config(config)
if errors:
for log_str in errors:
self._init_log.append((logging.ERROR, log_str))
raise ConfigLoadFailure(self._init_log)
def is_default(self, command: str, option: str) -> bool:
"""True if specified config option is unset (i.e. the default), False otherwise."""
return not (command in self._config and option in self._config[command])
def get_value(self,
command: str,
option: str,
override: Optional[ConfigValue] = None,
ignore_default: bool = False) -> ConfigValue:
"""Get the current setting or default value of the specified command option."""
assert command in CONFIG_MAP and option in CONFIG_MAP[command]
if override is not None:
return override
if command in self._config and option in self._config[command]:
value = self._config[command][option]
else:
value = CONFIG_MAP[command][option]
if ignore_default:
return None
if issubclass(type(value), ValidatedValue):
return value.value
return value
def get_help_string(self,
command: str,
option: str,
show_default: Optional[bool] = None) -> str:
"""Get a string to specify for the help text indicating the current command option value,
if set, or the default.
Arguments:
command: A command name or, "global" for global options.
option: Command-line option to set within `command`.
show_default: Always show default value. Default is False for flag/bool values,
True otherwise.
"""
assert command in CONFIG_MAP and option in CONFIG_MAP[command]
is_flag = isinstance(CONFIG_MAP[command][option], bool)
if command in self._config and option in self._config[command]:
if is_flag:
value_str = 'on' if self._config[command][option] else 'off'
else:
value_str = str(self._config[command][option])
return ' [setting: %s]' % (value_str)
if show_default is False or (show_default is None and is_flag
and CONFIG_MAP[command][option] is False):
return ''
return ' [default: %s]' % (str(CONFIG_MAP[command][option]))

View File

@@ -0,0 +1,820 @@
# -*- coding: utf-8 -*-
#
# PySceneDetect: Python-Based Video Scene Detector
# -------------------------------------------------------------------
# [ Site: https://scenedetect.com ]
# [ Docs: https://scenedetect.com/docs/ ]
# [ Github: https://github.com/Breakthrough/PySceneDetect/ ]
#
# Copyright (C) 2014-2023 Brandon Castellano <http://www.bcastell.com>.
# PySceneDetect is licensed under the BSD 3-Clause License; see the
# included LICENSE file, or visit one of the above pages for details.
#
"""Context of which command-line options and config settings the user provided."""
import logging
import os
from typing import Any, AnyStr, Dict, Optional, Tuple, Type
import click
import scenedetect
from scenedetect import open_video, AVAILABLE_BACKENDS
from scenedetect._scene_loader import SceneLoader
from scenedetect.scene_detector import SceneDetector
from scenedetect.platform import get_and_create_path, get_cv2_imwrite_params, init_logger
from scenedetect.frame_timecode import FrameTimecode, MAX_FPS_DELTA
from scenedetect.video_stream import VideoStream, VideoOpenFailure, FrameRateUnavailable
from scenedetect.video_splitter import is_mkvmerge_available, is_ffmpeg_available
from scenedetect.detectors import AdaptiveDetector, ContentDetector, ThresholdDetector
from scenedetect.stats_manager import StatsManager
from scenedetect.scene_manager import SceneManager, Interpolation
from scenedetect._cli.config import ConfigRegistry, ConfigLoadFailure, CHOICE_MAP
logger = logging.getLogger('pyscenedetect')
USER_CONFIG = ConfigRegistry(throw_exception=False)
def parse_timecode(value: str,
frame_rate: float,
first_index_is_one: bool = False) -> FrameTimecode:
"""Parses a user input string into a FrameTimecode assuming the given framerate.
If value is None, None will be returned instead of processing the value.
Raises:
click.BadParameter
"""
if value is None:
return None
try:
if first_index_is_one and value.isdigit():
value = int(value)
if value >= 1:
value -= 1
return FrameTimecode(timecode=value, fps=frame_rate)
except ValueError as ex:
raise click.BadParameter(
'timecode must be in frames (1234), seconds (123.4s), or HH:MM:SS (00:02:03.400)'
) from ex
def contains_sequence_or_url(video_path: str) -> bool:
"""Checks if the video path is a URL or image sequence."""
return '%' in video_path or '://' in video_path
def check_split_video_requirements(use_mkvmerge: bool) -> None:
""" Validates that the proper tool is available on the system to perform the
`split-video` command.
Arguments:
use_mkvmerge: True if mkvmerge (-m), False otherwise.
Raises: click.BadParameter if the proper video splitting tool cannot be found.
"""
if (use_mkvmerge and not is_mkvmerge_available()) or not is_ffmpeg_available():
error_strs = [
"{EXTERN_TOOL} is required for split-video{EXTRA_ARGS}.".format(
EXTERN_TOOL='mkvmerge' if use_mkvmerge else 'ffmpeg',
EXTRA_ARGS=' when mkvmerge (-m) is set' if use_mkvmerge else '')
]
error_strs += ['Ensure the program is available on your system and try again.']
if not use_mkvmerge and is_mkvmerge_available():
error_strs += ['You can specify mkvmerge (-m) to use mkvmerge for splitting.']
elif use_mkvmerge and is_ffmpeg_available():
error_strs += ['You can specify copy (-c) to use ffmpeg stream copying.']
error_str = '\n'.join(error_strs)
raise click.BadParameter(error_str, param_hint='split-video')
# pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-locals
class CliContext:
"""Context of the command-line interface and config file parameters passed between sub-commands.
Handles validation of options taken in from the CLI *and* configuration files.
After processing the main program options via `handle_options`, the CLI will then call
the respective `handle_*` method for each command. Once all commands have been
processed, the main program actions are executed by passing this object to the
`run_scenedetect` function in `scenedetect.cli.controller`.
"""
def __init__(self):
self.config = USER_CONFIG
self.video_stream: VideoStream = None
self.scene_manager: SceneManager = None
self.stats_manager: StatsManager = None
# Global `scenedetect` Options
self.output_directory: str = None # -o/--output
self.quiet_mode: bool = None # -q/--quiet or -v/--verbosity quiet
self.stats_file_path: str = None # -s/--stats
self.drop_short_scenes: bool = None # --drop-short-scenes
self.merge_last_scene: bool = None # --merge-last-scene
self.min_scene_len: FrameTimecode = None # -m/--min-scene-len
self.frame_skip: int = None # -fs/--frame-skip
self.default_detector: Tuple[Type[SceneDetector],
Dict[str, Any]] = None # [global] default-detector
# `time` Command Options
self.time: bool = False
self.start_time: FrameTimecode = None # time -s/--start
self.end_time: FrameTimecode = None # time -e/--end
self.duration: FrameTimecode = None # time -d/--duration
# `save-images` Command Options
self.save_images: bool = False
self.image_extension: str = None # save-images -j/--jpeg, -w/--webp, -p/--png
self.image_directory: str = None # save-images -o/--output
self.image_param: int = None # save-images -q/--quality if -j/-w,
# otherwise -c/--compression if -p
self.image_name_format: str = None # save-images -f/--name-format
self.num_images: int = None # save-images -n/--num-images
self.frame_margin: int = 1 # save-images -m/--frame-margin
self.scale: float = None # save-images -s/--scale
self.height: int = None # save-images -h/--height
self.width: int = None # save-images -w/--width
self.scale_method: Interpolation = None # [save-images] scale-method
# `split-video` Command Options
self.split_video: bool = False
self.split_mkvmerge: bool = None # split-video -m/--mkvmerge
self.split_args: str = None # split-video -a/--args, -c/--copy
self.split_directory: str = None # split-video -o/--output
self.split_name_format: str = None # split-video -f/--filename
self.split_quiet: bool = None # split-video -q/--quiet
# `list-scenes` Command Options
self.list_scenes: bool = False
self.print_scene_list: bool = None # list-scenes -q/--quiet
self.scene_list_directory: str = None # list-scenes -o/--output
self.scene_list_name_format: str = None # list-scenes -f/--filename
self.scene_list_output: bool = None # list-scenes -n/--no-output
self.skip_cuts: bool = None # list-scenes -s/--skip-cuts
# `export-html` Command Options
self.export_html: bool = False
self.html_name_format: str = None # export-html -f/--filename
self.html_include_images: bool = None # export-html --no-images
self.image_width: int = None # export-html -w/--image-width
self.image_height: int = None # export-html -h/--image-height
#
# Command Handlers
#
def handle_options(
self,
input_path: AnyStr,
output: Optional[AnyStr],
framerate: float,
stats_file: Optional[AnyStr],
downscale: Optional[int],
frame_skip: int,
min_scene_len: str,
drop_short_scenes: bool,
merge_last_scene: bool,
backend: Optional[str],
quiet: bool,
logfile: Optional[AnyStr],
config: Optional[AnyStr],
stats: Optional[AnyStr],
verbosity: Optional[str],
):
"""Parse all global options/arguments passed to the main scenedetect command,
before other sub-commands (e.g. this function processes the [options] when calling
`scenedetect [options] [commands [command options]]`).
Raises:
click.BadParameter: One of the given options/parameters is invalid.
click.Abort: Fatal initialization failure.
"""
# TODO(v1.0): Make the stats value optional (e.g. allow -s only), and allow use of
# $VIDEO_NAME macro in the name. Default to $VIDEO_NAME.csv.
try:
init_failure = not self.config.initialized
init_log = self.config.get_init_log()
quiet = not init_failure and quiet
self._initialize_logging(quiet=quiet, verbosity=verbosity, logfile=logfile)
# Configuration file was specified via CLI argument -c/--config.
if config and not init_failure:
self.config = ConfigRegistry(config)
init_log += self.config.get_init_log()
# Re-initialize logger with the correct verbosity.
if verbosity is None and not self.config.is_default('global', 'verbosity'):
verbosity_str = self.config.get_value('global', 'verbosity')
assert verbosity_str in CHOICE_MAP['global']['verbosity']
self.quiet_mode = False
self._initialize_logging(verbosity=verbosity_str, logfile=logfile)
except ConfigLoadFailure as ex:
init_failure = True
init_log += ex.init_log
if ex.reason is not None:
init_log += [(logging.ERROR, 'Error: %s' % str(ex.reason).replace('\t', ' '))]
finally:
# Make sure we print the version number even on any kind of init failure.
logger.info('PySceneDetect %s', scenedetect.__version__)
for (log_level, log_str) in init_log:
logger.log(log_level, log_str)
if init_failure:
logger.critical("Error processing configuration file.")
raise click.Abort()
if self.config.config_dict:
logger.debug("Current configuration:\n%s", str(self.config.config_dict))
logger.debug('Parsing program options.')
if stats is not None and frame_skip:
error_strs = [
'Unable to scene_detect scenes with stats file if frame skip is not 0.',
' Either remove the -fs/--frame-skip option, or the -s/--stats file.\n'
]
logger.error('\n'.join(error_strs))
raise click.BadParameter(
'Combining the -s/--stats and -fs/--frame-skip options is not supported.',
param_hint='frame skip + stats file')
# Handle the case where -i/--input was not specified (e.g. for the `help` command).
if input_path is None:
return
# Have to load the input video to obtain a time base before parsing timecodes.
self._open_video_stream(
input_path=input_path,
framerate=framerate,
backend=self.config.get_value("global", "backend", backend, ignore_default=True))
self.output_directory = output if output else self.config.get_value("global", "output")
if self.output_directory:
logger.info('Output directory set:\n %s', self.output_directory)
self.min_scene_len = parse_timecode(
min_scene_len if min_scene_len is not None else self.config.get_value(
"global", "min-scene-len"), self.video_stream.frame_rate)
self.drop_short_scenes = drop_short_scenes or self.config.get_value(
"global", "drop-short-scenes")
self.merge_last_scene = merge_last_scene or self.config.get_value(
"global", "merge-last-scene")
self.frame_skip = self.config.get_value("global", "frame-skip", frame_skip)
# Create StatsManager if --stats is specified.
if stats_file:
self.stats_file_path = get_and_create_path(stats_file, self.output_directory)
self.stats_manager = StatsManager()
# Initialize default detector with values in the config file.
default_detector = self.config.get_value("global", "default-detector")
if default_detector == 'scene_detect-adaptive':
self.default_detector = (AdaptiveDetector, self.get_detect_adaptive_params())
elif default_detector == 'scene_detect-content':
self.default_detector = (ContentDetector, self.get_detect_content_params())
elif default_detector == 'scene_detect-threshold':
self.default_detector = (ThresholdDetector, self.get_detect_threshold_params())
else:
raise click.BadParameter("Unknown detector type!", param_hint='default-detector')
logger.debug('Initializing SceneManager.')
scene_manager = SceneManager(self.stats_manager)
if downscale is None and self.config.is_default("global", "downscale"):
scene_manager.auto_downscale = True
else:
scene_manager.auto_downscale = False
downscale = self.config.get_value("global", "downscale", downscale)
try:
scene_manager.downscale = downscale
except ValueError as ex:
logger.debug(str(ex))
raise click.BadParameter(str(ex), param_hint='downscale factor')
scene_manager.interpolation = Interpolation[self.config.get_value(
'global', 'downscale-method').upper()]
self.scene_manager = scene_manager
def get_detect_content_params(
self,
threshold: Optional[float] = None,
luma_only: bool = None,
min_scene_len: Optional[str] = None,
weights: Optional[Tuple[float, float, float, float]] = None,
kernel_size: Optional[int] = None,
) -> Dict[str, Any]:
"""Handle scene_detect-content command options and return dict to construct one with."""
self._ensure_input_open()
if self.drop_short_scenes:
min_scene_len = 0
else:
if min_scene_len is None:
if self.config.is_default('scene_detect-content', 'min-scene-len'):
min_scene_len = self.min_scene_len.frame_num
else:
min_scene_len = self.config.get_value('scene_detect-content', 'min-scene-len')
min_scene_len = parse_timecode(min_scene_len, self.video_stream.frame_rate).frame_num
if weights is not None:
try:
weights = ContentDetector.Components(*weights)
except ValueError as ex:
logger.debug(str(ex))
raise click.BadParameter(str(ex), param_hint='weights')
return {
'weights': self.config.get_value('scene_detect-content', 'weights', weights),
'kernel_size': self.config.get_value('scene_detect-content', 'kernel-size', kernel_size),
'luma_only': luma_only or self.config.get_value('scene_detect-content', 'luma-only'),
'min_scene_len': min_scene_len,
'threshold': self.config.get_value('scene_detect-content', 'threshold', threshold),
}
def get_detect_adaptive_params(
self,
threshold: Optional[float] = None,
min_content_val: Optional[float] = None,
frame_window: Optional[int] = None,
luma_only: bool = None,
min_scene_len: Optional[str] = None,
weights: Optional[Tuple[float, float, float, float]] = None,
kernel_size: Optional[int] = None,
min_delta_hsv: Optional[float] = None,
) -> Dict[str, Any]:
"""Handle scene_detect-adaptive command options and return dict to construct one with."""
self._ensure_input_open()
# TODO(v0.7): Remove these branches when removing -d/--min-delta-hsv.
if min_delta_hsv is not None:
logger.error('-d/--min-delta-hsv is deprecated, use -c/--min-content-val instead.')
if min_content_val is None:
min_content_val = min_delta_hsv
# Handle case where deprecated min-delta-hsv is set, and use it to set min-content-val.
if not self.config.is_default("scene_detect-adaptive", "min-delta-hsv"):
logger.error('[scene_detect-adaptive] config file option `min-delta-hsv` is deprecated'
', use `min-delta-hsv` instead.')
if self.config.is_default("scene_detect-adaptive", "min-content-val"):
self.config.config_dict["scene_detect-adaptive"]["min-content-val"] = (
self.config.config_dict["scene_detect-adaptive"]["min-deleta-hsv"])
if self.drop_short_scenes:
min_scene_len = 0
else:
if min_scene_len is None:
if self.config.is_default("scene_detect-adaptive", "min-scene-len"):
min_scene_len = self.min_scene_len.frame_num
else:
min_scene_len = self.config.get_value("scene_detect-adaptive", "min-scene-len")
min_scene_len = parse_timecode(min_scene_len, self.video_stream.frame_rate).frame_num
if weights is not None:
try:
weights = ContentDetector.Components(*weights)
except ValueError as ex:
logger.debug(str(ex))
raise click.BadParameter(str(ex), param_hint='weights')
return {
'adaptive_threshold':
self.config.get_value("scene_detect-adaptive", "threshold", threshold),
'weights':
self.config.get_value("scene_detect-adaptive", "weights", weights),
'kernel_size':
self.config.get_value("scene_detect-adaptive", "kernel-size", kernel_size),
'luma_only':
luma_only or self.config.get_value("scene_detect-adaptive", "luma-only"),
'min_content_val':
self.config.get_value("scene_detect-adaptive", "min-content-val", min_content_val),
'min_scene_len':
min_scene_len,
'window_width':
self.config.get_value("scene_detect-adaptive", "frame-window", frame_window),
}
def get_detect_threshold_params(
self,
threshold: Optional[float] = None,
fade_bias: Optional[float] = None,
add_last_scene: bool = None,
min_scene_len: Optional[str] = None,
) -> Dict[str, Any]:
"""Handle scene_detect-threshold command options and return dict to construct one with."""
self._ensure_input_open()
if self.drop_short_scenes:
min_scene_len = 0
else:
if min_scene_len is None:
if self.config.is_default("scene_detect-threshold", "min-scene-len"):
min_scene_len = self.min_scene_len.frame_num
else:
min_scene_len = self.config.get_value("scene_detect-threshold", "min-scene-len")
min_scene_len = parse_timecode(min_scene_len, self.video_stream.frame_rate).frame_num
return {
# TODO(v1.0): add_last_scene cannot be disabled right now.
'add_final_scene':
add_last_scene or self.config.get_value("scene_detect-threshold", "add-last-scene"),
'fade_bias':
self.config.get_value("scene_detect-threshold", "fade-bias", fade_bias),
'min_scene_len':
min_scene_len,
'threshold':
self.config.get_value("scene_detect-threshold", "threshold", threshold),
}
def handle_load_scenes(self, input: AnyStr, start_col_name: Optional[str]):
"""Handle `load-scenes` command options."""
self._ensure_input_open()
start_col_name = self.config.get_value("load-scenes", "start-col-name", start_col_name)
self.add_detector(
SceneLoader(
file=input, framerate=self.video_stream.frame_rate, start_col_name=start_col_name))
def handle_export_html(
self,
filename: Optional[AnyStr],
no_images: bool,
image_width: Optional[int],
image_height: Optional[int],
):
"""Handle `export-html` command options."""
self._ensure_input_open()
if self.export_html:
self._on_duplicate_command('export_html')
no_images = no_images or self.config.get_value('export-html', 'no-images')
self.html_include_images = not no_images
self.html_name_format = self.config.get_value('export-html', 'filename', filename)
self.image_width = self.config.get_value('export-html', 'image-width', image_width)
self.image_height = self.config.get_value('export-html', 'image-height', image_height)
if not self.save_images and not no_images:
raise click.BadArgumentUsage(
'The export-html command requires that the save-images command\n'
'is specified before it, unless --no-images is specified.')
logger.info('HTML file name format:\n %s', filename)
self.export_html = True
def handle_list_scenes(
self,
output: Optional[AnyStr],
filename: Optional[AnyStr],
no_output_file: bool,
quiet: bool,
skip_cuts: bool,
):
"""Handle `list-scenes` command options."""
self._ensure_input_open()
if self.list_scenes:
self._on_duplicate_command('list-scenes')
self.skip_cuts = skip_cuts or self.config.get_value('list-scenes', 'skip-cuts')
self.print_scene_list = not (quiet or self.config.get_value('list-scenes', 'quiet'))
no_output_file = no_output_file or self.config.get_value('list-scenes', 'no-output-file')
self.scene_list_directory = self.config.get_value(
'list-scenes', 'output', output, ignore_default=True)
self.scene_list_name_format = self.config.get_value('list-scenes', 'filename', filename)
if self.scene_list_name_format is not None and not no_output_file:
logger.info('Scene list filename format:\n %s', self.scene_list_name_format)
self.scene_list_output = not no_output_file
if self.scene_list_directory is not None:
logger.info('Scene list output directory:\n %s', self.scene_list_directory)
self.list_scenes = True
def handle_split_video(
self,
output: Optional[AnyStr],
filename: Optional[AnyStr],
quiet: bool,
copy: bool,
high_quality: bool,
rate_factor: Optional[int],
preset: Optional[str],
args: Optional[str],
mkvmerge: bool,
):
"""Handle `split-video` command options."""
self._ensure_input_open()
if self.split_video:
self._on_duplicate_command('split-video')
check_split_video_requirements(use_mkvmerge=mkvmerge)
if contains_sequence_or_url(self.video_stream.path):
error_str = 'The split-video command is incompatible with image sequences/URLs.'
raise click.BadParameter(error_str, param_hint='split-video')
##
## Common Arguments/Options
##
self.split_video = True
self.split_quiet = quiet or self.config.get_value('split-video', 'quiet')
self.split_directory = self.config.get_value(
'split-video', 'output', output, ignore_default=True)
if self.split_directory is not None:
logger.info('Video output path set: \n%s', self.split_directory)
self.split_name_format = self.config.get_value('split-video', 'filename', filename)
# We only load the config values for these flags/options if none of the other
# encoder flags/options were set via the CLI to avoid any conflicting options
# (e.g. if the config file sets `high-quality = yes` but `--copy` is specified).
if not (mkvmerge or copy or high_quality or args or rate_factor or preset):
mkvmerge = self.config.get_value('split-video', 'mkvmerge')
copy = self.config.get_value('split-video', 'copy')
high_quality = self.config.get_value('split-video', 'high-quality')
rate_factor = self.config.get_value('split-video', 'rate-factor')
preset = self.config.get_value('split-video', 'preset')
args = self.config.get_value('split-video', 'args')
# Disallow certain combinations of flags/options.
if mkvmerge or copy:
command = 'mkvmerge (-m)' if mkvmerge else 'copy (-c)'
if high_quality:
raise click.BadParameter(
'high-quality (-hq) cannot be used with %s' % (command),
param_hint='split-video')
if args:
raise click.BadParameter(
'args (-a) cannot be used with %s' % (command), param_hint='split-video')
if rate_factor:
raise click.BadParameter(
'rate-factor (crf) cannot be used with %s' % (command),
param_hint='split-video')
if preset:
raise click.BadParameter(
'preset (-p) cannot be used with %s' % (command), param_hint='split-video')
##
## mkvmerge-Specific Arguments/Options
##
if mkvmerge:
if copy:
logger.warning('copy mode (-c) ignored due to mkvmerge mode (-m).')
self.split_mkvmerge = True
logger.info('Using mkvmerge for video splitting.')
return
##
## ffmpeg-Specific Arguments/Options
##
if copy:
args = '-map 0 -c:v copy -c:a copy'
elif not args:
if rate_factor is None:
rate_factor = 22 if not high_quality else 17
if preset is None:
preset = 'veryfast' if not high_quality else 'slow'
args = ('-map 0 -c:v libx264 -preset {PRESET} -crf {RATE_FACTOR} -c:a aac'.format(
PRESET=preset, RATE_FACTOR=rate_factor))
logger.info('ffmpeg arguments: %s', args)
self.split_args = args
if filename:
logger.info('Output file name format: %s', filename)
def handle_save_images(
self,
num_images: Optional[int],
output: Optional[AnyStr],
filename: Optional[AnyStr],
jpeg: bool,
webp: bool,
quality: Optional[int],
png: bool,
compression: Optional[int],
frame_margin: Optional[int],
scale: Optional[float],
height: Optional[int],
width: Optional[int],
):
"""Handle `save-images` command options."""
self._ensure_input_open()
if self.save_images:
self._on_duplicate_command('save-images')
if '://' in self.video_stream.path:
error_str = '\nThe save-images command is incompatible with URLs.'
logger.error(error_str)
raise click.BadParameter(error_str, param_hint='save-images')
num_flags = sum([1 if flag else 0 for flag in [jpeg, webp, png]])
if num_flags > 1:
logger.error('Multiple image type flags set for save-images command.')
raise click.BadParameter(
'Only one image type (JPG/PNG/WEBP) can be specified.', param_hint='save-images')
# Only use config params for image format if one wasn't specified.
elif num_flags == 0:
image_format = self.config.get_value('save-images', 'format').lower()
jpeg = image_format == 'jpeg'
webp = image_format == 'webp'
png = image_format == 'png'
# Only use config params for scale/height/width if none of them are specified explicitly.
if scale is None and height is None and width is None:
self.scale = self.config.get_value('save-images', 'scale')
self.height = self.config.get_value('save-images', 'height')
self.width = self.config.get_value('save-images', 'width')
else:
self.scale = scale
self.height = height
self.width = width
self.scale_method = Interpolation[self.config.get_value('save-images',
'scale-method').upper()]
default_quality = 100 if webp else 95
quality = (
default_quality if self.config.is_default('save-images', 'quality') else
self.config.get_value('save-images', 'quality'))
compression = self.config.get_value('save-images', 'compression', compression)
self.image_param = compression if png else quality
self.image_extension = 'jpg' if jpeg else 'png' if png else 'webp'
valid_params = get_cv2_imwrite_params()
if not self.image_extension in valid_params or valid_params[self.image_extension] is None:
error_strs = [
'Image encoder type `%s` not supported.' % self.image_extension.upper(),
'The specified encoder type could not be found in the current OpenCV module.',
'To enable this output format, please update the installed version of OpenCV.',
'If you build OpenCV, ensure the the proper dependencies are enabled. '
]
logger.debug('\n'.join(error_strs))
raise click.BadParameter('\n'.join(error_strs), param_hint='save-images')
self.image_directory = self.config.get_value(
'save-images', 'output', output, ignore_default=True)
self.image_name_format = self.config.get_value('save-images', 'filename', filename)
self.num_images = self.config.get_value('save-images', 'num-images', num_images)
self.frame_margin = self.config.get_value('save-images', 'frame-margin', frame_margin)
image_type = ('jpeg' if jpeg else self.image_extension).upper()
image_param_type = 'Compression' if png else 'Quality'
image_param_type = ' [%s: %d]' % (image_param_type, self.image_param)
logger.info('Image output format set: %s%s', image_type, image_param_type)
if self.image_directory is not None:
logger.info('Image output directory set:\n %s', os.path.abspath(self.image_directory))
self.save_images = True
def handle_time(self, start, duration, end):
"""Handle `time` command options."""
self._ensure_input_open()
if self.time:
self._on_duplicate_command('time')
if duration is not None and end is not None:
raise click.BadParameter(
'Only one of --duration/-d or --end/-e can be specified, not both.',
param_hint='time')
logger.debug('Setting video time:\n start: %s, duration: %s, end: %s', start, duration,
end)
self.start_time = parse_timecode(
start, self.video_stream.frame_rate, first_index_is_one=True)
self.end_time = parse_timecode(end, self.video_stream.frame_rate, first_index_is_one=True)
self.duration = parse_timecode(
duration, self.video_stream.frame_rate, first_index_is_one=True)
self.time = True
#
# Private Methods
#
def _initialize_logging(
self,
quiet: Optional[bool] = None,
verbosity: Optional[str] = None,
logfile: Optional[AnyStr] = None,
):
"""Setup logging based on CLI args and user configuration settings."""
if quiet is not None:
self.quiet_mode = bool(quiet)
curr_verbosity = logging.INFO
# Convert verbosity into it's log level enum, and override quiet mode if set.
if verbosity is not None:
assert verbosity in CHOICE_MAP['global']['verbosity']
if verbosity.lower() == 'none':
self.quiet_mode = True
verbosity = 'info'
else:
# Override quiet mode if verbosity is set.
self.quiet_mode = False
curr_verbosity = getattr(logging, verbosity.upper())
else:
verbosity_str = USER_CONFIG.get_value('global', 'verbosity')
assert verbosity_str in CHOICE_MAP['global']['verbosity']
if verbosity_str.lower() == 'none':
self.quiet_mode = True
else:
curr_verbosity = getattr(logging, verbosity_str.upper())
# Override quiet mode if verbosity is set.
if not USER_CONFIG.is_default('global', 'verbosity'):
self.quiet_mode = False
# Initialize logger with the set CLI args / user configuration.
init_logger(log_level=curr_verbosity, show_stdout=not self.quiet_mode, log_file=logfile)
def add_detector(self, detector):
""" Add Detector: Adds a detection algorithm to the CliContext's SceneManager. """
self._ensure_input_open()
try:
self.scene_manager.add_detector(detector)
except scenedetect.stats_manager.FrameMetricRegistered as ex:
raise click.BadParameter(
message='Cannot specify detection algorithm twice.',
param_hint=detector.cli_name) from ex
def _ensure_input_open(self) -> None:
"""Ensure self.video_stream was initialized (i.e. -i/--input was specified),
otherwise raises an exception. Should only be used from commands that require an
input video to process the options (e.g. those that require a timecode).
Raises:
click.BadParameter: self.video_stream was not initialized.
"""
if self.video_stream is None:
raise click.ClickException('No input video (-i/--input) was specified.')
def _open_video_stream(self, input_path: AnyStr, framerate: Optional[float],
backend: Optional[str]):
if '%' in input_path and backend != 'opencv':
raise click.BadParameter(
'The OpenCV backend (`--backend opencv`) must be used to process image sequences.',
param_hint='-i/--input')
if framerate is not None and framerate < MAX_FPS_DELTA:
raise click.BadParameter('Invalid framerate specified!', param_hint='-f/--framerate')
try:
if backend is None:
backend = self.config.get_value('global', 'backend')
else:
if not backend in AVAILABLE_BACKENDS:
raise click.BadParameter(
'Specified backend %s is not available on this system!' % backend,
param_hint='-b/--backend')
# Open the video with the specified backend, loading any required config settings.
if backend == 'pyav':
self.video_stream = open_video(
path=input_path,
framerate=framerate,
backend=backend,
threading_mode=self.config.get_value('backend-pyav', 'threading-mode'),
suppress_output=self.config.get_value('backend-pyav', 'suppress-output'),
)
elif backend == 'opencv':
self.video_stream = open_video(
path=input_path,
framerate=framerate,
backend=backend,
max_decode_attempts=self.config.get_value('backend-opencv',
'max-decode-attempts'),
)
# Handle backends without any config options.
else:
self.video_stream = open_video(
path=input_path,
framerate=framerate,
backend=backend,
)
logger.debug('Video opened using backend %s', type(self.video_stream).__name__)
except FrameRateUnavailable as ex:
raise click.BadParameter(
'Failed to obtain framerate for input video. Manually specify framerate with the'
' -f/--framerate option, or try re-encoding the file.',
param_hint='-i/--input') from ex
except VideoOpenFailure as ex:
raise click.BadParameter(
'Failed to open input video%s: %s' %
(' using %s backend' % backend if backend else '', str(ex)),
param_hint='-i/--input') from ex
except OSError as ex:
raise click.BadParameter('Input error:\n\n\t%s\n' % str(ex), param_hint='-i/--input')
def _on_duplicate_command(self, command: str) -> None:
"""Called when a command is duplicated to stop parsing and raise an error.
Arguments:
command: Command that was duplicated for error context.
Raises:
click.BadParameter
"""
error_strs = []
error_strs.append('Error: Command %s specified multiple times.' % command)
error_strs.append('The %s command may appear only one time.')
logger.error('\n'.join(error_strs))
raise click.BadParameter(
'\n Command %s may only be specified once.' % command,
param_hint='%s command' % command)

View File

@@ -0,0 +1,273 @@
# -*- coding: utf-8 -*-
#
# PySceneDetect: Python-Based Video Scene Detector
# -------------------------------------------------------------------
# [ Site: https://scenedetect.com ]
# [ Docs: https://scenedetect.com/docs/ ]
# [ Github: https://github.com/Breakthrough/PySceneDetect/ ]
#
# Copyright (C) 2014-2023 Brandon Castellano <http://www.bcastell.com>.
# PySceneDetect is licensed under the BSD 3-Clause License; see the
# included LICENSE file, or visit one of the above pages for details.
#
"""Logic for the PySceneDetect command."""
import logging
import os
from string import Template
import time
from typing import Dict, List, Tuple, Optional
from string import Template
from scenedetect.detectors import AdaptiveDetector
from scenedetect.frame_timecode import FrameTimecode
from scenedetect.platform import get_and_create_path, get_file_name
from scenedetect.scene_manager import save_images, write_scene_list, write_scene_list_html
from scenedetect.video_splitter import split_video_mkvmerge, split_video_ffmpeg
from scenedetect.video_stream import SeekError
from scenedetect._cli.context import CliContext, check_split_video_requirements
logger = logging.getLogger('pyscenedetect')
def run_scenedetect(context: CliContext):
"""Perform main CLI application control logic. Run once all command-line options and
configuration file options have been validated.
Arguments:
context: Prevalidated command-line option context to use for processing.
"""
# No input may have been specified depending on the commands/args that were used.
logger.debug("Running controller.")
if context.scene_manager is None:
logger.debug("No input specified.")
return
# Use default detector if one was not specified.
if context.scene_manager.get_num_detectors() == 0:
detector_type, detector_args = context.default_detector
logger.debug('Using default detector: %s(%s)' % (detector_type.__name__, detector_args))
context.scene_manager.add_detector(detector_type(**detector_args))
perf_start_time = time.time()
if context.start_time is not None:
logger.debug('Seeking to start time...')
try:
context.video_stream.seek(target=context.start_time)
except SeekError as ex:
logging.critical('Failed to seek to %s / frame %d: %s',
context.start_time.get_timecode(), context.start_time.get_frames(),
str(ex))
return
num_frames = context.scene_manager.detect_scenes(
video=context.video_stream,
duration=context.duration,
end_time=context.end_time,
frame_skip=context.frame_skip,
show_progress=not context.quiet_mode)
# Handle case where video failure is most likely due to multiple audio tracks (#179).
if num_frames <= 0 and context.video_stream.BACKEND_NAME == 'opencv':
logger.critical(
'Failed to read any frames from video file. This could be caused by the video'
' having multiple audio tracks. If so, try installing the PyAV backend:\n'
' pip install av\n'
'Or remove the audio tracks by running either:\n'
' ffmpeg -i input.mp4 -c copy -an output.mp4\n'
' mkvmerge -o output.mkv input.mp4\n'
'For details, see https://scenedetect.com/faq/')
return
perf_duration = time.time() - perf_start_time
logger.info('Processed %d frames in %.1f seconds (average %.2f FPS).', num_frames,
perf_duration,
float(num_frames) / perf_duration)
# Handle -s/--stats option.
_save_stats(context)
# Get list of detected cuts/scenes from the SceneManager to generate the required output
# files, based on the given commands (list-scenes, split-video, save-images, etc...).
cut_list = context.scene_manager.get_cut_list(show_warning=False)
scene_list = context.scene_manager.get_scene_list(start_in_scene=True)
# Handle --merge-last-scene.
if context.merge_last_scene and context.min_scene_len is not None and context.min_scene_len > 0:
if len(scene_list) > 1 and (scene_list[-1][1] - scene_list[-1][0]) < context.min_scene_len:
new_last_scene = (scene_list[-2][0], scene_list[-1][1])
scene_list = scene_list[:-2] + [new_last_scene]
# Handle --drop-short-scenes.
if context.drop_short_scenes and context.min_scene_len > 0:
scene_list = [s for s in scene_list if (s[1] - s[0]) >= context.min_scene_len]
# Ensure we don't divide by zero.
if scene_list:
logger.info(
'Detected %d scenes, average shot length %.1f seconds.', len(scene_list),
sum([(end_time - start_time).get_seconds() for start_time, end_time in scene_list]) /
float(len(scene_list)))
else:
logger.info('No scenes detected.')
# Handle list-scenes command.
_list_scenes(context, scene_list, cut_list)
# Handle save-images command.
image_filenames = _save_images(context, scene_list)
# Handle export-html command.
_export_html(context, scene_list, cut_list, image_filenames)
# Handle split-video command.
_split_video(context, scene_list)
def _save_stats(context: CliContext) -> None:
"""Handles saving the statsfile if -s/--stats was specified."""
if context.stats_file_path is not None:
# We check if the save is required in order to reduce unnecessary log messages.
if context.stats_manager.is_save_required():
logger.info('Saving frame metrics to stats file: %s',
os.path.basename(context.stats_file_path))
context.stats_manager.save_to_csv(csv_file=context.stats_file_path)
else:
logger.debug('No frame metrics updated, skipping update of the stats file.')
def _list_scenes(context: CliContext, scene_list: List[Tuple[FrameTimecode, FrameTimecode]],
cut_list: List[FrameTimecode]) -> None:
"""Handles the `list-scenes` command."""
if context.scene_list_output:
scene_list_filename = Template(
context.scene_list_name_format).safe_substitute(VIDEO_NAME=context.video_stream.name)
if not scene_list_filename.lower().endswith('.csv'):
scene_list_filename += '.csv'
scene_list_path = get_and_create_path(
scene_list_filename, context.scene_list_directory
if context.scene_list_directory is not None else context.output_directory)
logger.info('Writing scene list to CSV file:\n %s', scene_list_path)
with open(scene_list_path, 'wt') as scene_list_file:
write_scene_list(
output_csv_file=scene_list_file,
scene_list=scene_list,
include_cut_list=not context.skip_cuts,
cut_list=cut_list)
if context.print_scene_list:
logger.info(
"""Scene List:
-----------------------------------------------------------------------
| Scene # | Start Frame | Start Time | End Frame | End Time |
-----------------------------------------------------------------------
%s
-----------------------------------------------------------------------
""", '\n'.join([
' | %5d | %11d | %s | %11d | %s |' %
(i + 1, start_time.get_frames() + 1, start_time.get_timecode(),
end_time.get_frames(), end_time.get_timecode())
for i, (start_time, end_time) in enumerate(scene_list)
]))
if cut_list:
logger.info('Comma-separated timecode list:\n %s',
','.join([cut.get_timecode() for cut in cut_list]))
def _save_images(
context: CliContext,
scene_list: List[Tuple[FrameTimecode, FrameTimecode]]) -> Optional[Dict[int, List[str]]]:
"""Handles the `save-images` command."""
if not context.save_images:
return None
image_output_dir = context.output_directory
if context.image_directory is not None:
image_output_dir = context.image_directory
return save_images(
scene_list=scene_list,
video=context.video_stream,
num_images=context.num_images,
frame_margin=context.frame_margin,
image_extension=context.image_extension,
encoder_param=context.image_param,
image_name_template=context.image_name_format,
output_dir=image_output_dir,
show_progress=not context.quiet_mode,
scale=context.scale,
height=context.height,
width=context.width,
interpolation=context.scale_method)
def _export_html(context: CliContext, scene_list: List[Tuple[FrameTimecode, FrameTimecode]],
cut_list: List[FrameTimecode], image_filenames: Optional[Dict[int,
List[str]]]) -> None:
"""Handles the `export-html` command."""
if not context.export_html:
return
html_filename = Template(
context.html_name_format).safe_substitute(VIDEO_NAME=context.video_stream.name)
if not html_filename.lower().endswith('.html'):
html_filename += '.html'
html_path = get_and_create_path(
html_filename, context.image_directory
if context.image_directory is not None else context.output_directory)
logger.info('Exporting to html file:\n %s:', html_path)
if not context.html_include_images:
image_filenames = None
write_scene_list_html(
html_path,
scene_list,
cut_list,
image_filenames=image_filenames,
image_width=context.image_width,
image_height=context.image_height)
def _split_video(context: CliContext, scene_list: List[Tuple[FrameTimecode,
FrameTimecode]]) -> None:
"""Handles the `split-video` command."""
if not context.split_video:
return
output_path_template = context.split_name_format
# Add proper extension to filename template if required.
dot_pos = output_path_template.rfind('.')
extension_length = 0 if dot_pos < 0 else len(output_path_template) - (dot_pos + 1)
# If using mkvmerge, force extension to .mkv.
if context.split_mkvmerge and not output_path_template.endswith('.mkv'):
output_path_template += '.mkv'
# Otherwise, if using ffmpeg, only add an extension if one doesn't exist.
elif not 2 <= extension_length <= 4:
output_path_template += '.mp4'
# Pre-expand $VIDEO_NAME so it can be used for a directory.
# TODO: Do this elsewhere in a future version for all output options.
output_path_template = Template(output_path_template).safe_substitute(
VIDEO_NAME=get_file_name(context.video_stream.path, include_extension=False))
output_path_template = get_and_create_path(
output_path_template, context.split_directory
if context.split_directory is not None else context.output_directory)
# Ensure the appropriate tool is available before handling split-video.
check_split_video_requirements(context.split_mkvmerge)
if context.split_mkvmerge:
split_video_mkvmerge(
input_video_path=context.video_stream.path,
scene_list=scene_list,
output_file_template=output_path_template,
show_output=not (context.quiet_mode or context.split_quiet),
)
else:
split_video_ffmpeg(
input_video_path=context.video_stream.path,
scene_list=scene_list,
output_file_template=output_path_template,
arg_override=context.split_args,
show_progress=not context.quiet_mode,
show_output=not (context.quiet_mode or context.split_quiet),
)
if scene_list:
logger.info('Video splitting completed, scenes written to disk.')