mirror of
https://github.com/HiMeditator/auto-caption.git
synced 2026-02-18 23:14:49 +08:00
refactor(engine): 重构字幕引擎
- 更新 GummyTranslator 类,优化字幕生成逻辑 - 移除 audioprcs 模块,音频处理功能转移到 utils 模块 - 重构 sysaudio 模块,提高音频流管理的灵活性和稳定性 - 修改 TODO.md,完成按时间降序排列字幕记录的功能 - 更新文档,说明因资源限制将不再维护英文和日文文档
This commit is contained in:
@@ -0,0 +1,10 @@
|
||||
import sys
|
||||
|
||||
if sys.platform == "win32":
|
||||
from .win import AudioStream
|
||||
elif sys.platform == "darwin":
|
||||
from .darwin import AudioStream
|
||||
elif sys.platform == "linux":
|
||||
from .linux import AudioStream
|
||||
else:
|
||||
raise NotImplementedError(f"Unsupported platform: {sys.platform}")
|
||||
@@ -1,11 +1,24 @@
|
||||
"""获取 MacOS 系统音频输入/输出流"""
|
||||
|
||||
import pyaudio
|
||||
from textwrap import dedent
|
||||
|
||||
|
||||
def get_blackhole_device(mic: pyaudio.PyAudio):
|
||||
"""
|
||||
获取 BlackHole 设备
|
||||
"""
|
||||
device_count = mic.get_device_count()
|
||||
for i in range(device_count):
|
||||
dev_info = mic.get_device_info_by_index(i)
|
||||
if 'blackhole' in str(dev_info["name"]).lower():
|
||||
return dev_info
|
||||
raise Exception("The device containing BlackHole was not found.")
|
||||
|
||||
|
||||
class AudioStream:
|
||||
"""
|
||||
获取系统音频流(支持 BlackHole 作为系统音频输出捕获)
|
||||
获取系统音频流(如果要捕获输出音频,仅支持 BlackHole 作为系统音频输出捕获)
|
||||
|
||||
初始化参数:
|
||||
audio_type: 0-系统音频输出流(需配合 BlackHole),1-系统音频输入流
|
||||
@@ -15,46 +28,40 @@ class AudioStream:
|
||||
self.audio_type = audio_type
|
||||
self.mic = pyaudio.PyAudio()
|
||||
if self.audio_type == 0:
|
||||
self.device = self.getOutputDeviceInfo()
|
||||
self.device = get_blackhole_device(self.mic)
|
||||
else:
|
||||
self.device = self.mic.get_default_input_device_info()
|
||||
self.stop_signal = False
|
||||
self.stream = None
|
||||
self.SAMP_WIDTH = pyaudio.get_sample_size(pyaudio.paInt16)
|
||||
self.INDEX = self.device["index"]
|
||||
self.FORMAT = pyaudio.paInt16
|
||||
self.CHANNELS = self.device["maxInputChannels"]
|
||||
self.SAMP_WIDTH = pyaudio.get_sample_size(self.FORMAT)
|
||||
self.CHANNELS = int(self.device["maxInputChannels"])
|
||||
self.RATE = int(self.device["defaultSampleRate"])
|
||||
self.CHUNK = self.RATE // chunk_rate
|
||||
self.INDEX = self.device["index"]
|
||||
|
||||
def getOutputDeviceInfo(self):
|
||||
"""查找指定关键词的输入设备"""
|
||||
device_count = self.mic.get_device_count()
|
||||
for i in range(device_count):
|
||||
dev_info = self.mic.get_device_info_by_index(i)
|
||||
if 'blackhole' in dev_info["name"].lower():
|
||||
return dev_info
|
||||
raise Exception("The device containing BlackHole was not found.")
|
||||
|
||||
def printInfo(self):
|
||||
def get_info(self):
|
||||
dev_info = f"""
|
||||
采样输入设备:
|
||||
采样设备:
|
||||
- 设备类型:{ "音频输出" if self.audio_type == 0 else "音频输入" }
|
||||
- 序号:{self.device['index']}
|
||||
- 名称:{self.device['name']}
|
||||
- 设备序号:{self.device['index']}
|
||||
- 设备名称:{self.device['name']}
|
||||
- 最大输入通道数:{self.device['maxInputChannels']}
|
||||
- 默认低输入延迟:{self.device['defaultLowInputLatency']}s
|
||||
- 默认高输入延迟:{self.device['defaultHighInputLatency']}s
|
||||
- 默认采样率:{self.device['defaultSampleRate']}Hz
|
||||
- 是否回环设备:{self.device['isLoopbackDevice']}
|
||||
|
||||
音频样本块大小:{self.CHUNK}
|
||||
设备序号:{self.INDEX}
|
||||
样本格式:{self.FORMAT}
|
||||
样本位宽:{self.SAMP_WIDTH}
|
||||
采样格式:{self.FORMAT}
|
||||
音频通道数:{self.CHANNELS}
|
||||
音频采样率:{self.RATE}
|
||||
样本通道数:{self.CHANNELS}
|
||||
样本采样率:{self.RATE}
|
||||
样本块大小:{self.CHUNK}
|
||||
"""
|
||||
print(dev_info)
|
||||
return dedent(dev_info).strip()
|
||||
|
||||
def openStream(self):
|
||||
def open_stream(self):
|
||||
"""
|
||||
打开并返回系统音频输出流
|
||||
"""
|
||||
@@ -72,14 +79,24 @@ class AudioStream:
|
||||
"""
|
||||
读取音频数据
|
||||
"""
|
||||
if self.stop_signal:
|
||||
self.close_stream()
|
||||
return None
|
||||
if not self.stream: return None
|
||||
return self.stream.read(self.CHUNK, exception_on_overflow=False)
|
||||
|
||||
def closeStream(self):
|
||||
def close_stream_signal(self):
|
||||
"""
|
||||
关闭系统音频输出流
|
||||
线程安全的关闭系统音频输入流,不一定会立即关闭
|
||||
"""
|
||||
if self.stream is None: return
|
||||
self.stream.stop_stream()
|
||||
self.stream.close()
|
||||
self.stream = None
|
||||
self.stop_signal = True
|
||||
|
||||
def close_stream(self):
|
||||
"""
|
||||
立即关闭系统音频输入流
|
||||
"""
|
||||
if self.stream is not None:
|
||||
self.stream.stop_stream()
|
||||
self.stream.close()
|
||||
self.stream = None
|
||||
self.stop_signal = False
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
"""获取 Linux 系统音频输入流"""
|
||||
|
||||
import subprocess
|
||||
from textwrap import dedent
|
||||
|
||||
def findMonitorSource():
|
||||
|
||||
def find_monitor_source():
|
||||
result = subprocess.run(
|
||||
["pactl", "list", "short", "sources"],
|
||||
stdout=subprocess.PIPE, text=True
|
||||
@@ -16,7 +18,8 @@ def findMonitorSource():
|
||||
|
||||
raise RuntimeError("System output monitor device not found")
|
||||
|
||||
def findInputSource():
|
||||
|
||||
def find_input_source():
|
||||
result = subprocess.run(
|
||||
["pactl", "list", "short", "sources"],
|
||||
stdout=subprocess.PIPE, text=True
|
||||
@@ -28,8 +31,10 @@ def findInputSource():
|
||||
name = parts[1]
|
||||
if ".monitor" not in name:
|
||||
return name
|
||||
|
||||
raise RuntimeError("Microphone input device not found")
|
||||
|
||||
|
||||
class AudioStream:
|
||||
"""
|
||||
获取系统音频流
|
||||
@@ -42,34 +47,33 @@ class AudioStream:
|
||||
self.audio_type = audio_type
|
||||
|
||||
if self.audio_type == 0:
|
||||
self.source = findMonitorSource()
|
||||
self.source = find_monitor_source()
|
||||
else:
|
||||
self.source = findInputSource()
|
||||
|
||||
self.source = find_input_source()
|
||||
self.stop_signal = False
|
||||
self.process = None
|
||||
|
||||
self.SAMP_WIDTH = 2
|
||||
self.FORMAT = 16
|
||||
self.SAMP_WIDTH = 2
|
||||
self.CHANNELS = 2
|
||||
self.RATE = 48000
|
||||
self.CHUNK = self.RATE // chunk_rate
|
||||
|
||||
def printInfo(self):
|
||||
def get_info(self):
|
||||
dev_info = f"""
|
||||
音频捕获进程:
|
||||
- 捕获类型:{"音频输出" if self.audio_type == 0 else "音频输入"}
|
||||
- 设备源:{self.source}
|
||||
- 捕获进程PID:{self.process.pid if self.process else "None"}
|
||||
- 捕获进程 PID:{self.process.pid if self.process else "None"}
|
||||
|
||||
音频样本块大小:{self.CHUNK}
|
||||
样本格式:{self.FORMAT}
|
||||
样本位宽:{self.SAMP_WIDTH}
|
||||
采样格式:{self.FORMAT}
|
||||
音频通道数:{self.CHANNELS}
|
||||
音频采样率:{self.RATE}
|
||||
样本通道数:{self.CHANNELS}
|
||||
样本采样率:{self.RATE}
|
||||
样本块大小:{self.CHUNK}
|
||||
"""
|
||||
print(dev_info)
|
||||
|
||||
def openStream(self):
|
||||
def open_stream(self):
|
||||
"""
|
||||
启动音频捕获进程
|
||||
"""
|
||||
@@ -82,13 +86,23 @@ class AudioStream:
|
||||
"""
|
||||
读取音频数据
|
||||
"""
|
||||
if self.process:
|
||||
if self.stop_signal:
|
||||
self.close_stream()
|
||||
return None
|
||||
if self.process and self.process.stdout:
|
||||
return self.process.stdout.read(self.CHUNK)
|
||||
return None
|
||||
|
||||
def closeStream(self):
|
||||
def close_stream_signal(self):
|
||||
"""
|
||||
线程安全的关闭系统音频输入流,不一定会立即关闭
|
||||
"""
|
||||
self.stop_signal = True
|
||||
|
||||
def close_stream(self):
|
||||
"""
|
||||
关闭系统音频捕获进程
|
||||
"""
|
||||
if self.process:
|
||||
self.process.terminate()
|
||||
self.stop_signal = False
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
"""获取 Windows 系统音频输入/输出流"""
|
||||
|
||||
import pyaudiowpatch as pyaudio
|
||||
from textwrap import dedent
|
||||
|
||||
|
||||
def getDefaultLoopbackDevice(mic: pyaudio.PyAudio, info = True)->dict:
|
||||
def get_default_loopback_device(mic: pyaudio.PyAudio, info = True)->dict:
|
||||
"""
|
||||
获取默认的系统音频输出的回环设备
|
||||
Args:
|
||||
mic (pyaudio.PyAudio): pyaudio对象
|
||||
info (bool, optional): 是否打印设备信息
|
||||
mic: pyaudio对象
|
||||
info: 是否打印设备信息
|
||||
|
||||
Returns:
|
||||
dict: 系统音频输出的回环设备
|
||||
@@ -51,38 +52,40 @@ class AudioStream:
|
||||
self.audio_type = audio_type
|
||||
self.mic = pyaudio.PyAudio()
|
||||
if self.audio_type == 0:
|
||||
self.device = getDefaultLoopbackDevice(self.mic, False)
|
||||
self.device = get_default_loopback_device(self.mic, False)
|
||||
else:
|
||||
self.device = self.mic.get_default_input_device_info()
|
||||
self.stop_signal = False
|
||||
self.stream = None
|
||||
self.SAMP_WIDTH = pyaudio.get_sample_size(pyaudio.paInt16)
|
||||
self.INDEX = self.device["index"]
|
||||
self.FORMAT = pyaudio.paInt16
|
||||
self.SAMP_WIDTH = pyaudio.get_sample_size(self.FORMAT)
|
||||
self.CHANNELS = int(self.device["maxInputChannels"])
|
||||
self.RATE = int(self.device["defaultSampleRate"])
|
||||
self.CHUNK = self.RATE // chunk_rate
|
||||
self.INDEX = self.device["index"]
|
||||
|
||||
def printInfo(self):
|
||||
def get_info(self):
|
||||
dev_info = f"""
|
||||
采样设备:
|
||||
- 设备类型:{ "音频输出" if self.audio_type == 0 else "音频输入" }
|
||||
- 序号:{self.device['index']}
|
||||
- 名称:{self.device['name']}
|
||||
- 设备序号:{self.device['index']}
|
||||
- 设备名称:{self.device['name']}
|
||||
- 最大输入通道数:{self.device['maxInputChannels']}
|
||||
- 默认低输入延迟:{self.device['defaultLowInputLatency']}s
|
||||
- 默认高输入延迟:{self.device['defaultHighInputLatency']}s
|
||||
- 默认采样率:{self.device['defaultSampleRate']}Hz
|
||||
- 是否回环设备:{self.device['isLoopbackDevice']}
|
||||
|
||||
音频样本块大小:{self.CHUNK}
|
||||
设备序号:{self.INDEX}
|
||||
样本格式:{self.FORMAT}
|
||||
样本位宽:{self.SAMP_WIDTH}
|
||||
采样格式:{self.FORMAT}
|
||||
音频通道数:{self.CHANNELS}
|
||||
音频采样率:{self.RATE}
|
||||
样本通道数:{self.CHANNELS}
|
||||
样本采样率:{self.RATE}
|
||||
样本块大小:{self.CHUNK}
|
||||
"""
|
||||
print(dev_info)
|
||||
return dedent(dev_info).strip()
|
||||
|
||||
def openStream(self):
|
||||
def open_stream(self):
|
||||
"""
|
||||
打开并返回系统音频输出流
|
||||
"""
|
||||
@@ -96,18 +99,28 @@ class AudioStream:
|
||||
)
|
||||
return self.stream
|
||||
|
||||
def read_chunk(self):
|
||||
def read_chunk(self) -> bytes | None:
|
||||
"""
|
||||
读取音频数据
|
||||
"""
|
||||
if self.stop_signal:
|
||||
self.close_stream()
|
||||
return None
|
||||
if not self.stream: return None
|
||||
return self.stream.read(self.CHUNK, exception_on_overflow=False)
|
||||
|
||||
def closeStream(self):
|
||||
def close_stream_signal(self):
|
||||
"""
|
||||
关闭系统音频输出流
|
||||
线程安全的关闭系统音频输入流,不一定会立即关闭
|
||||
"""
|
||||
if self.stream is None: return
|
||||
self.stream.stop_stream()
|
||||
self.stream.close()
|
||||
self.stream = None
|
||||
self.stop_signal = True
|
||||
|
||||
def close_stream(self):
|
||||
"""
|
||||
关闭系统音频输入流
|
||||
"""
|
||||
if self.stream is not None:
|
||||
self.stream.stop_stream()
|
||||
self.stream.close()
|
||||
self.stream = None
|
||||
self.stop_signal = False
|
||||
|
||||
Reference in New Issue
Block a user