refactor(caption): 重构字幕引擎结构 - Fixes #2

- 修复gummy字幕引擎长时间空置报错的问题
- 将 python-subprocess 文件夹重命名为 caption-engine
- 删除未使用的 prototype 代码
This commit is contained in:
himeditator
2025-07-05 12:45:43 +08:00
parent 259a033d24
commit 62671f6db5
15 changed files with 31 additions and 245 deletions

View File

@@ -0,0 +1,82 @@
from dashscope.audio.asr import (
TranslationRecognizerCallback,
TranscriptionResult,
TranslationResult,
TranslationRecognizerRealtime
)
from datetime import datetime
import json
import sys
class Callback(TranslationRecognizerCallback):
"""
语音大模型流式传输回调对象
"""
def __init__(self):
super().__init__()
self.usage = 0
self.cur_id = -1
self.time_str = ''
def on_open(self) -> None:
# print("on_open")
pass
def on_close(self) -> None:
# print("on_close")
pass
def on_event(
self,
request_id,
transcription_result: TranscriptionResult,
translation_result: TranslationResult,
usage
) -> None:
caption = {}
if transcription_result is not None:
caption['index'] = transcription_result.sentence_id
caption['text'] = transcription_result.text
if caption['index'] != self.cur_id:
self.cur_id = caption['index']
cur_time = datetime.now().strftime('%H:%M:%S')
caption['time_s'] = cur_time
self.time_str = cur_time
else:
caption['time_s'] = self.time_str
caption['time_t'] = datetime.now().strftime('%H:%M:%S')
caption['translation'] = ""
if translation_result is not None:
lang = translation_result.get_language_list()[0]
caption['translation'] = translation_result.get_translation(lang).text
if usage:
self.usage += usage['duration']
# print(caption)
self.send_to_node(caption)
def send_to_node(self, data):
"""
将数据发送到 Node.js 进程
"""
try:
json_data = json.dumps(data) + '\n'
sys.stdout.write(json_data)
sys.stdout.flush()
except Exception as e:
print(f"Error sending data to Node.js: {e}", file=sys.stderr)
class GummyTranslator:
def __init__(self, rate, source, target):
self.translator = TranslationRecognizerRealtime(
model = "gummy-realtime-v1",
format = "pcm",
sample_rate = rate,
transcription_enabled = True,
translation_enabled = (target is not None),
source_language = source,
translation_target_languages = [target],
callback = Callback()
)

View File

@@ -0,0 +1,51 @@
import sys
if sys.platform == 'win32':
from sysaudio.win import AudioStream, mergeStreamChannels
elif sys.platform == 'linux':
from sysaudio.linux import AudioStream, mergeStreamChannels
else:
raise NotImplementedError(f"Unsupported platform: {sys.platform}")
from audio2text.gummy import GummyTranslator
import sys
import argparse
def convert_audio_to_text(s_lang, t_lang, audio_type):
sys.stdout.reconfigure(line_buffering=True) # type: ignore
stream = AudioStream(audio_type)
stream.openStream()
if t_lang == 'none':
gummy = GummyTranslator(stream.RATE, s_lang, None)
else:
gummy = GummyTranslator(stream.RATE, s_lang, t_lang)
gummy.translator.start()
while True:
try:
if not stream.stream: continue
data = stream.stream.read(stream.CHUNK)
data = mergeStreamChannels(data, stream.CHANNELS)
try:
gummy.translator.send_audio_frame(data)
except:
gummy.translator.start()
gummy.translator.send_audio_frame(data)
except KeyboardInterrupt:
stream.closeStream()
gummy.translator.stop()
break
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Convert system audio stream to text')
parser.add_argument('-s', '--source_language', default='en', help='Source language code')
parser.add_argument('-t', '--target_language', default='zh', help='Target language code')
parser.add_argument('-a', '--audio_type', default='0', help='Audio stream source: 0 for output audio stream, 1 for input audio stream')
args = parser.parse_args()
convert_audio_to_text(
args.source_language,
args.target_language,
0 if args.audio_type == '0' else 1
)

View File

@@ -0,0 +1,38 @@
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
['main-gummy.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
noarchive=False,
optimize=0,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.datas,
[],
name='main-gummy',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)

View File

@@ -0,0 +1,5 @@
dashscope==1.23.5
numpy==2.2.6
PyAudio==0.2.14
PyAudioWPatch==0.2.12.7 # Windows only
pyinstaller==6.14.1

View File

@@ -0,0 +1,79 @@
import pyaudio
import numpy as np
def mergeStreamChannels(data, channels):
"""
将当前多通道流数据合并为单通道流数据
Args:
data: 多通道数据
channels: 通道数
Returns:
mono_data_bytes: 单通道数据
"""
# (length * channels,)
data_np = np.frombuffer(data, dtype=np.int16)
# (length, channels)
data_np_r = data_np.reshape(-1, channels)
# (length,)
mono_data = np.mean(data_np_r.astype(np.float32), axis=1)
mono_data = mono_data.astype(np.int16)
mono_data_bytes = mono_data.tobytes()
return mono_data_bytes
class AudioStream:
def __init__(self, audio_type=1):
self.audio_type = audio_type
self.mic = pyaudio.PyAudio()
self.device = self.mic.get_default_input_device_info()
self.stream = None
self.SAMP_WIDTH = pyaudio.get_sample_size(pyaudio.paInt16)
self.FORMAT = pyaudio.paInt16
self.CHANNELS = self.device["maxInputChannels"]
self.RATE = int(self.device["defaultSampleRate"])
self.CHUNK = self.RATE // 20
self.INDEX = self.device["index"]
def printInfo(self):
dev_info = f"""
采样输入设备:
- 设备类型:{ "音频输入Linux平台目前仅支持该项" }
- 序号:{self.device['index']}
- 名称:{self.device['name']}
- 最大输入通道数:{self.device['maxInputChannels']}
- 默认低输入延迟:{self.device['defaultLowInputLatency']}s
- 默认高输入延迟:{self.device['defaultHighInputLatency']}s
- 默认采样率:{self.device['defaultSampleRate']}Hz
音频样本块大小:{self.CHUNK}
样本位宽:{self.SAMP_WIDTH}
音频数据格式:{self.FORMAT}
音频通道数:{self.CHANNELS}
音频采样率:{self.RATE}
"""
print(dev_info)
def openStream(self):
"""
打开并返回系统音频输出流
"""
if self.stream: return self.stream
self.stream = self.mic.open(
format = self.FORMAT,
channels = self.CHANNELS,
rate = self.RATE,
input = True,
input_device_index = self.INDEX
)
return self.stream
def closeStream(self):
"""
关闭系统音频输出流
"""
if self.stream is None: return
self.stream.stop_stream()
self.stream.close()
self.stream = None

View File

@@ -0,0 +1,127 @@
"""获取 Windows 系统音频输出流"""
import pyaudiowpatch as pyaudio
import numpy as np
def getDefaultLoopbackDevice(mic: pyaudio.PyAudio, info = True)->dict:
"""
获取默认的系统音频输出的回环设备
Args:
mic (pyaudio.PyAudio): pyaudio对象
info (bool, optional): 是否打印设备信息
Returns:
dict: 系统音频输出的回环设备
"""
try:
WASAPI_info = mic.get_host_api_info_by_type(pyaudio.paWASAPI)
except OSError:
print("Looks like WASAPI is not available on the system. Exiting...")
exit()
default_speaker = mic.get_device_info_by_index(WASAPI_info["defaultOutputDevice"])
if(info): print("wasapi_info:\n", WASAPI_info, "\n")
if(info): print("default_speaker:\n", default_speaker, "\n")
if not default_speaker["isLoopbackDevice"]:
for loopback in mic.get_loopback_device_info_generator():
if default_speaker["name"] in loopback["name"]:
default_speaker = loopback
if(info): print("Using loopback device:\n", default_speaker, "\n")
break
else:
print("Default loopback output device not found.")
print("Run `python -m pyaudiowpatch` to check available devices.")
print("Exiting...")
exit()
if(info): print(f"Output Stream Device: #{default_speaker['index']} {default_speaker['name']}")
return default_speaker
def mergeStreamChannels(data, channels):
"""
将当前多通道流数据合并为单通道流数据
Args:
data: 多通道数据
channels: 通道数
Returns:
mono_data_bytes: 单通道数据
"""
# (length * channels,)
data_np = np.frombuffer(data, dtype=np.int16)
# (length, channels)
data_np_r = data_np.reshape(-1, channels)
# (length,)
mono_data = np.mean(data_np_r.astype(np.float32), axis=1)
mono_data = mono_data.astype(np.int16)
mono_data_bytes = mono_data.tobytes()
return mono_data_bytes
class AudioStream:
"""
获取系统音频流
参数:
audio_type: 默认0-系统音频输出流1-系统音频输入流
"""
def __init__(self, audio_type=0):
self.audio_type = audio_type
self.mic = pyaudio.PyAudio()
if self.audio_type == 0:
self.device = getDefaultLoopbackDevice(self.mic, False)
else:
self.device = self.mic.get_default_input_device_info()
self.stream = None
self.SAMP_WIDTH = pyaudio.get_sample_size(pyaudio.paInt16)
self.FORMAT = pyaudio.paInt16
self.CHANNELS = self.device["maxInputChannels"]
self.RATE = int(self.device["defaultSampleRate"])
self.CHUNK = self.RATE // 20
self.INDEX = self.device["index"]
def printInfo(self):
dev_info = f"""
采样设备:
- 设备类型:{ "音频输入" if self.audio_type == 0 else "音频输出" }
- 序号:{self.device['index']}
- 名称:{self.device['name']}
- 最大输入通道数:{self.device['maxInputChannels']}
- 默认低输入延迟:{self.device['defaultLowInputLatency']}s
- 默认高输入延迟:{self.device['defaultHighInputLatency']}s
- 默认采样率:{self.device['defaultSampleRate']}Hz
- 是否回环设备:{self.device['isLoopbackDevice']}
音频样本块大小:{self.CHUNK}
样本位宽:{self.SAMP_WIDTH}
音频数据格式:{self.FORMAT}
音频通道数:{self.CHANNELS}
音频采样率:{self.RATE}
"""
print(dev_info)
def openStream(self):
"""
打开并返回系统音频输出流
"""
if self.stream: return self.stream
self.stream = self.mic.open(
format = self.FORMAT,
channels = self.CHANNELS,
rate = self.RATE,
input = True,
input_device_index = self.INDEX
)
return self.stream
def closeStream(self):
"""
关闭系统音频输出流
"""
if self.stream is None: return
self.stream.stop_stream()
self.stream.close()
self.stream = None