mirror of
https://github.com/HiMeditator/auto-caption.git
synced 2026-02-15 04:14:46 +08:00
refactor(engine): 字幕引擎文件夹重命名,字幕记录添加降序选择
- 字幕记录表格可以按时间降序排列 - 将 caption-engine 重命名为 engine - 更新了相关文件和文件夹的路径 - 修改了 README 和 TODO 文档中的相关内容 - 更新了 Electron 构建配置
This commit is contained in:
2
engine/audio2text/__init__.py
Normal file
2
engine/audio2text/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
from dashscope.common.error import InvalidParameter
|
||||
from .gummy import GummyTranslator
|
||||
105
engine/audio2text/gummy.py
Normal file
105
engine/audio2text/gummy.py
Normal file
@@ -0,0 +1,105 @@
|
||||
from dashscope.audio.asr import (
|
||||
TranslationRecognizerCallback,
|
||||
TranscriptionResult,
|
||||
TranslationResult,
|
||||
TranslationRecognizerRealtime
|
||||
)
|
||||
import dashscope
|
||||
from datetime import datetime
|
||||
import json
|
||||
import sys
|
||||
|
||||
class Callback(TranslationRecognizerCallback):
|
||||
"""
|
||||
语音大模型流式传输回调对象
|
||||
"""
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.usage = 0
|
||||
self.cur_id = -1
|
||||
self.time_str = ''
|
||||
|
||||
def on_open(self) -> None:
|
||||
# print("on_open")
|
||||
pass
|
||||
|
||||
def on_close(self) -> None:
|
||||
# print("on_close")
|
||||
pass
|
||||
|
||||
def on_event(
|
||||
self,
|
||||
request_id,
|
||||
transcription_result: TranscriptionResult,
|
||||
translation_result: TranslationResult,
|
||||
usage
|
||||
) -> None:
|
||||
caption = {}
|
||||
if transcription_result is not None:
|
||||
caption['index'] = transcription_result.sentence_id
|
||||
caption['text'] = transcription_result.text
|
||||
if caption['index'] != self.cur_id:
|
||||
self.cur_id = caption['index']
|
||||
cur_time = datetime.now().strftime('%H:%M:%S.%f')[:-3]
|
||||
caption['time_s'] = cur_time
|
||||
self.time_str = cur_time
|
||||
else:
|
||||
caption['time_s'] = self.time_str
|
||||
caption['time_t'] = datetime.now().strftime('%H:%M:%S.%f')[:-3]
|
||||
caption['translation'] = ""
|
||||
|
||||
if translation_result is not None:
|
||||
lang = translation_result.get_language_list()[0]
|
||||
caption['translation'] = translation_result.get_translation(lang).text
|
||||
|
||||
if usage:
|
||||
self.usage += usage['duration']
|
||||
|
||||
# print(caption)
|
||||
self.send_to_node(caption)
|
||||
|
||||
def send_to_node(self, data):
|
||||
"""
|
||||
将数据发送到 Node.js 进程
|
||||
"""
|
||||
try:
|
||||
json_data = json.dumps(data) + '\n'
|
||||
sys.stdout.write(json_data)
|
||||
sys.stdout.flush()
|
||||
except Exception as e:
|
||||
print(f"Error sending data to Node.js: {e}", file=sys.stderr)
|
||||
|
||||
class GummyTranslator:
|
||||
"""
|
||||
使用 Gummy 引擎流式处理的音频数据,并在标准输出中输出与 Auto Caption 软件可读取的 JSON 字符串数据
|
||||
|
||||
初始化参数:
|
||||
rate: 音频采样率
|
||||
source: 源语言代码字符串(zh, en, ja 等)
|
||||
target: 目标语言代码字符串(zh, en, ja 等)
|
||||
"""
|
||||
def __init__(self, rate, source, target, api_key):
|
||||
if api_key:
|
||||
dashscope.api_key = api_key
|
||||
self.translator = TranslationRecognizerRealtime(
|
||||
model = "gummy-realtime-v1",
|
||||
format = "pcm",
|
||||
sample_rate = rate,
|
||||
transcription_enabled = True,
|
||||
translation_enabled = (target is not None),
|
||||
source_language = source,
|
||||
translation_target_languages = [target],
|
||||
callback = Callback()
|
||||
)
|
||||
|
||||
def start(self):
|
||||
"""启动 Gummy 引擎"""
|
||||
self.translator.start()
|
||||
|
||||
def send_audio_frame(self, data):
|
||||
"""发送音频帧"""
|
||||
self.translator.send_audio_frame(data)
|
||||
|
||||
def stop(self):
|
||||
"""停止 Gummy 引擎"""
|
||||
self.translator.stop()
|
||||
1
engine/audioprcs/__init__.py
Normal file
1
engine/audioprcs/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .process import mergeChunkChannels, resampleRawChunk, resampleMonoChunk
|
||||
68
engine/audioprcs/process.py
Normal file
68
engine/audioprcs/process.py
Normal file
@@ -0,0 +1,68 @@
|
||||
import samplerate
|
||||
import numpy as np
|
||||
|
||||
def mergeChunkChannels(chunk, channels):
|
||||
"""
|
||||
将当前多通道音频数据块转换为单通道音频数据块
|
||||
|
||||
Args:
|
||||
chunk: (bytes)多通道音频数据块
|
||||
channels: 通道数
|
||||
|
||||
Returns:
|
||||
(bytes)单通道音频数据块
|
||||
"""
|
||||
# (length * channels,)
|
||||
chunk_np = np.frombuffer(chunk, dtype=np.int16)
|
||||
# (length, channels)
|
||||
chunk_np = chunk_np.reshape(-1, channels)
|
||||
# (length,)
|
||||
chunk_mono_f = np.mean(chunk_np.astype(np.float32), axis=1)
|
||||
chunk_mono = np.round(chunk_mono_f).astype(np.int16)
|
||||
return chunk_mono.tobytes()
|
||||
|
||||
|
||||
def resampleRawChunk(chunk, channels, orig_sr, target_sr, mode="sinc_best"):
|
||||
"""
|
||||
将当前多通道音频数据块转换成单通道音频数据块,然后进行重采样
|
||||
|
||||
Args:
|
||||
chunk: (bytes)多通道音频数据块
|
||||
channels: 通道数
|
||||
orig_sr: 原始采样率
|
||||
target_sr: 目标采样率
|
||||
mode: 重采样模式,可选:'sinc_best' | 'sinc_medium' | 'sinc_fastest' | 'zero_order_hold' | 'linear'
|
||||
|
||||
Return:
|
||||
(bytes)单通道音频数据块
|
||||
"""
|
||||
# (length * channels,)
|
||||
chunk_np = np.frombuffer(chunk, dtype=np.int16)
|
||||
# (length, channels)
|
||||
chunk_np = chunk_np.reshape(-1, channels)
|
||||
# (length,)
|
||||
chunk_mono_f = np.mean(chunk_np.astype(np.float32), axis=1)
|
||||
chunk_mono = chunk_mono_f.astype(np.int16)
|
||||
ratio = target_sr / orig_sr
|
||||
chunk_mono_r = samplerate.resample(chunk_mono, ratio, converter_type=mode)
|
||||
chunk_mono_r = np.round(chunk_mono_r).astype(np.int16)
|
||||
return chunk_mono_r.tobytes()
|
||||
|
||||
def resampleMonoChunk(chunk, orig_sr, target_sr, mode="sinc_best"):
|
||||
"""
|
||||
将当前单通道音频块进行重采样
|
||||
|
||||
Args:
|
||||
chunk: (bytes)单通道音频数据块
|
||||
orig_sr: 原始采样率
|
||||
target_sr: 目标采样率
|
||||
mode: 重采样模式,可选:'sinc_best' | 'sinc_medium' | 'sinc_fastest' | 'zero_order_hold' | 'linear'
|
||||
|
||||
Return:
|
||||
(bytes)单通道音频数据块
|
||||
"""
|
||||
chunk_np = np.frombuffer(chunk, dtype=np.int16)
|
||||
ratio = target_sr / orig_sr
|
||||
chunk_r = samplerate.resample(chunk_np, ratio, converter_type=mode)
|
||||
chunk_r = np.round(chunk_r).astype(np.int16)
|
||||
return chunk_r.tobytes()
|
||||
58
engine/main-gummy.py
Normal file
58
engine/main-gummy.py
Normal file
@@ -0,0 +1,58 @@
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
if sys.platform == 'win32':
|
||||
from sysaudio.win import AudioStream
|
||||
elif sys.platform == 'darwin':
|
||||
from sysaudio.darwin import AudioStream
|
||||
elif sys.platform == 'linux':
|
||||
from sysaudio.linux import AudioStream
|
||||
else:
|
||||
raise NotImplementedError(f"Unsupported platform: {sys.platform}")
|
||||
|
||||
from audioprcs import mergeChunkChannels
|
||||
from audio2text import InvalidParameter, GummyTranslator
|
||||
|
||||
|
||||
def convert_audio_to_text(s_lang, t_lang, audio_type, chunk_rate, api_key):
|
||||
sys.stdout.reconfigure(line_buffering=True) # type: ignore
|
||||
stream = AudioStream(audio_type, chunk_rate)
|
||||
|
||||
if t_lang == 'none':
|
||||
gummy = GummyTranslator(stream.RATE, s_lang, None, api_key)
|
||||
else:
|
||||
gummy = GummyTranslator(stream.RATE, s_lang, t_lang, api_key)
|
||||
|
||||
stream.openStream()
|
||||
gummy.start()
|
||||
|
||||
while True:
|
||||
try:
|
||||
chunk = stream.read_chunk()
|
||||
chunk_mono = mergeChunkChannels(chunk, stream.CHANNELS)
|
||||
try:
|
||||
gummy.send_audio_frame(chunk_mono)
|
||||
except InvalidParameter:
|
||||
gummy.start()
|
||||
gummy.send_audio_frame(chunk_mono)
|
||||
except KeyboardInterrupt:
|
||||
stream.closeStream()
|
||||
gummy.stop()
|
||||
break
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description='Convert system audio stream to text')
|
||||
parser.add_argument('-s', '--source_language', default='en', help='Source language code')
|
||||
parser.add_argument('-t', '--target_language', default='zh', help='Target language code')
|
||||
parser.add_argument('-a', '--audio_type', default=0, help='Audio stream source: 0 for output audio stream, 1 for input audio stream')
|
||||
parser.add_argument('-c', '--chunk_rate', default=20, help='The number of audio stream chunks collected per second.')
|
||||
parser.add_argument('-k', '--api_key', default='', help='API KEY for Gummy model')
|
||||
args = parser.parse_args()
|
||||
convert_audio_to_text(
|
||||
args.source_language,
|
||||
args.target_language,
|
||||
int(args.audio_type),
|
||||
int(args.chunk_rate),
|
||||
args.api_key
|
||||
)
|
||||
39
engine/main-gummy.spec
Normal file
39
engine/main-gummy.spec
Normal file
@@ -0,0 +1,39 @@
|
||||
# -*- mode: python ; coding: utf-8 -*-
|
||||
|
||||
|
||||
a = Analysis(
|
||||
['main-gummy.py'],
|
||||
pathex=[],
|
||||
binaries=[],
|
||||
datas=[],
|
||||
hiddenimports=[],
|
||||
hookspath=[],
|
||||
hooksconfig={},
|
||||
runtime_hooks=[],
|
||||
excludes=[],
|
||||
noarchive=False,
|
||||
optimize=0,
|
||||
)
|
||||
pyz = PYZ(a.pure)
|
||||
|
||||
exe = EXE(
|
||||
pyz,
|
||||
a.scripts,
|
||||
a.binaries,
|
||||
a.datas,
|
||||
[],
|
||||
name='main-gummy',
|
||||
debug=False,
|
||||
bootloader_ignore_signals=False,
|
||||
strip=False,
|
||||
upx=True,
|
||||
upx_exclude=[],
|
||||
runtime_tmpdir=None,
|
||||
console=True,
|
||||
disable_windowed_traceback=False,
|
||||
argv_emulation=False,
|
||||
target_arch=None,
|
||||
codesign_identity=None,
|
||||
entitlements_file=None,
|
||||
onefile=True,
|
||||
)
|
||||
83
engine/main-vosk.py
Normal file
83
engine/main-vosk.py
Normal file
@@ -0,0 +1,83 @@
|
||||
import sys
|
||||
import json
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
import numpy.core.multiarray
|
||||
|
||||
if sys.platform == 'win32':
|
||||
from sysaudio.win import AudioStream
|
||||
elif sys.platform == 'darwin':
|
||||
from sysaudio.darwin import AudioStream
|
||||
elif sys.platform == 'linux':
|
||||
from sysaudio.linux import AudioStream
|
||||
else:
|
||||
raise NotImplementedError(f"Unsupported platform: {sys.platform}")
|
||||
|
||||
from vosk import Model, KaldiRecognizer, SetLogLevel
|
||||
from audioprcs import resampleRawChunk
|
||||
|
||||
SetLogLevel(-1)
|
||||
|
||||
def convert_audio_to_text(audio_type, chunk_rate, model_path):
|
||||
sys.stdout.reconfigure(line_buffering=True) # type: ignore
|
||||
|
||||
if model_path.startswith('"'):
|
||||
model_path = model_path[1:]
|
||||
if model_path.endswith('"'):
|
||||
model_path = model_path[:-1]
|
||||
|
||||
model = Model(model_path)
|
||||
recognizer = KaldiRecognizer(model, 16000)
|
||||
|
||||
stream = AudioStream(audio_type, chunk_rate)
|
||||
stream.openStream()
|
||||
|
||||
time_str = ''
|
||||
cur_id = 0
|
||||
prev_content = ''
|
||||
|
||||
while True:
|
||||
chunk = stream.read_chunk()
|
||||
chunk_mono = resampleRawChunk(chunk, stream.CHANNELS, stream.RATE, 16000)
|
||||
|
||||
caption = {}
|
||||
if recognizer.AcceptWaveform(chunk_mono):
|
||||
content = json.loads(recognizer.Result()).get('text', '')
|
||||
caption['index'] = cur_id
|
||||
caption['text'] = content
|
||||
caption['time_s'] = time_str
|
||||
caption['time_t'] = datetime.now().strftime('%H:%M:%S.%f')[:-3]
|
||||
caption['translation'] = ''
|
||||
prev_content = ''
|
||||
cur_id += 1
|
||||
else:
|
||||
content = json.loads(recognizer.PartialResult()).get('partial', '')
|
||||
if content == '' or content == prev_content:
|
||||
continue
|
||||
if prev_content == '':
|
||||
time_str = datetime.now().strftime('%H:%M:%S.%f')[:-3]
|
||||
caption['index'] = cur_id
|
||||
caption['text'] = content
|
||||
caption['time_s'] = time_str
|
||||
caption['time_t'] = datetime.now().strftime('%H:%M:%S.%f')[:-3]
|
||||
caption['translation'] = ''
|
||||
prev_content = content
|
||||
try:
|
||||
json_str = json.dumps(caption) + '\n'
|
||||
sys.stdout.write(json_str)
|
||||
sys.stdout.flush()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description='Convert system audio stream to text')
|
||||
parser.add_argument('-a', '--audio_type', default=0, help='Audio stream source: 0 for output audio stream, 1 for input audio stream')
|
||||
parser.add_argument('-c', '--chunk_rate', default=20, help='The number of audio stream chunks collected per second.')
|
||||
parser.add_argument('-m', '--model_path', default='', help='The path to the vosk model.')
|
||||
args = parser.parse_args()
|
||||
convert_audio_to_text(
|
||||
int(args.audio_type),
|
||||
int(args.chunk_rate),
|
||||
args.model_path
|
||||
)
|
||||
47
engine/main-vosk.spec
Normal file
47
engine/main-vosk.spec
Normal file
@@ -0,0 +1,47 @@
|
||||
# -*- mode: python ; coding: utf-8 -*-
|
||||
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
if sys.platform == 'win32':
|
||||
vosk_path = str(Path('./subenv/Lib/site-packages/vosk').resolve())
|
||||
else:
|
||||
vosk_path = str(Path('./subenv/lib/python3.12/site-packages/vosk').resolve())
|
||||
|
||||
a = Analysis(
|
||||
['main-vosk.py'],
|
||||
pathex=[],
|
||||
binaries=[],
|
||||
datas=[(vosk_path, 'vosk')],
|
||||
hiddenimports=[],
|
||||
hookspath=[],
|
||||
hooksconfig={},
|
||||
runtime_hooks=[],
|
||||
excludes=[],
|
||||
noarchive=False,
|
||||
optimize=0,
|
||||
)
|
||||
|
||||
pyz = PYZ(a.pure)
|
||||
|
||||
exe = EXE(
|
||||
pyz,
|
||||
a.scripts,
|
||||
a.binaries,
|
||||
a.datas,
|
||||
[],
|
||||
name='main-vosk',
|
||||
debug=False,
|
||||
bootloader_ignore_signals=False,
|
||||
strip=False,
|
||||
upx=True,
|
||||
upx_exclude=[],
|
||||
runtime_tmpdir=None,
|
||||
console=True,
|
||||
disable_windowed_traceback=False,
|
||||
argv_emulation=False,
|
||||
target_arch=None,
|
||||
codesign_identity=None,
|
||||
entitlements_file=None,
|
||||
onefile=True,
|
||||
)
|
||||
6
engine/requirements_darwin.txt
Normal file
6
engine/requirements_darwin.txt
Normal file
@@ -0,0 +1,6 @@
|
||||
dashscope
|
||||
numpy
|
||||
samplerate
|
||||
PyAudio
|
||||
vosk
|
||||
pyinstaller
|
||||
5
engine/requirements_linux.txt
Normal file
5
engine/requirements_linux.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
dashscope
|
||||
numpy
|
||||
vosk
|
||||
pyinstaller
|
||||
samplerate # pip install samplerate --only-binary=:all:
|
||||
6
engine/requirements_win.txt
Normal file
6
engine/requirements_win.txt
Normal file
@@ -0,0 +1,6 @@
|
||||
dashscope
|
||||
numpy
|
||||
samplerate
|
||||
PyAudioWPatch
|
||||
vosk
|
||||
pyinstaller
|
||||
0
engine/sysaudio/__init__.py
Normal file
0
engine/sysaudio/__init__.py
Normal file
85
engine/sysaudio/darwin.py
Normal file
85
engine/sysaudio/darwin.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""获取 MacOS 系统音频输入/输出流"""
|
||||
|
||||
import pyaudio
|
||||
|
||||
|
||||
class AudioStream:
|
||||
"""
|
||||
获取系统音频流(支持 BlackHole 作为系统音频输出捕获)
|
||||
|
||||
初始化参数:
|
||||
audio_type: 0-系统音频输出流(需配合 BlackHole),1-系统音频输入流
|
||||
chunk_rate: 每秒采集音频块的数量,默认为20
|
||||
"""
|
||||
def __init__(self, audio_type=0, chunk_rate=20):
|
||||
self.audio_type = audio_type
|
||||
self.mic = pyaudio.PyAudio()
|
||||
if self.audio_type == 0:
|
||||
self.device = self.getOutputDeviceInfo()
|
||||
else:
|
||||
self.device = self.mic.get_default_input_device_info()
|
||||
self.stream = None
|
||||
self.SAMP_WIDTH = pyaudio.get_sample_size(pyaudio.paInt16)
|
||||
self.FORMAT = pyaudio.paInt16
|
||||
self.CHANNELS = self.device["maxInputChannels"]
|
||||
self.RATE = int(self.device["defaultSampleRate"])
|
||||
self.CHUNK = self.RATE // chunk_rate
|
||||
self.INDEX = self.device["index"]
|
||||
|
||||
def getOutputDeviceInfo(self):
|
||||
"""查找指定关键词的输入设备"""
|
||||
device_count = self.mic.get_device_count()
|
||||
for i in range(device_count):
|
||||
dev_info = self.mic.get_device_info_by_index(i)
|
||||
if 'blackhole' in dev_info["name"].lower():
|
||||
return dev_info
|
||||
raise Exception("The device containing BlackHole was not found.")
|
||||
|
||||
def printInfo(self):
|
||||
dev_info = f"""
|
||||
采样输入设备:
|
||||
- 设备类型:{ "音频输出" if self.audio_type == 0 else "音频输入" }
|
||||
- 序号:{self.device['index']}
|
||||
- 名称:{self.device['name']}
|
||||
- 最大输入通道数:{self.device['maxInputChannels']}
|
||||
- 默认低输入延迟:{self.device['defaultLowInputLatency']}s
|
||||
- 默认高输入延迟:{self.device['defaultHighInputLatency']}s
|
||||
- 默认采样率:{self.device['defaultSampleRate']}Hz
|
||||
|
||||
音频样本块大小:{self.CHUNK}
|
||||
样本位宽:{self.SAMP_WIDTH}
|
||||
采样格式:{self.FORMAT}
|
||||
音频通道数:{self.CHANNELS}
|
||||
音频采样率:{self.RATE}
|
||||
"""
|
||||
print(dev_info)
|
||||
|
||||
def openStream(self):
|
||||
"""
|
||||
打开并返回系统音频输出流
|
||||
"""
|
||||
if self.stream: return self.stream
|
||||
self.stream = self.mic.open(
|
||||
format = self.FORMAT,
|
||||
channels = int(self.CHANNELS),
|
||||
rate = self.RATE,
|
||||
input = True,
|
||||
input_device_index = int(self.INDEX)
|
||||
)
|
||||
return self.stream
|
||||
|
||||
def read_chunk(self):
|
||||
"""
|
||||
读取音频数据
|
||||
"""
|
||||
if not self.stream: return None
|
||||
return self.stream.read(self.CHUNK, exception_on_overflow=False)
|
||||
|
||||
def closeStream(self):
|
||||
"""
|
||||
关闭系统音频输出流
|
||||
"""
|
||||
if self.stream is None: return
|
||||
self.stream.stop_stream()
|
||||
self.stream.close()
|
||||
self.stream = None
|
||||
94
engine/sysaudio/linux.py
Normal file
94
engine/sysaudio/linux.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""获取 Linux 系统音频输入流"""
|
||||
|
||||
import subprocess
|
||||
|
||||
def findMonitorSource():
|
||||
result = subprocess.run(
|
||||
["pactl", "list", "short", "sources"],
|
||||
stdout=subprocess.PIPE, text=True
|
||||
)
|
||||
lines = result.stdout.splitlines()
|
||||
|
||||
for line in lines:
|
||||
parts = line.split('\t')
|
||||
if len(parts) >= 2 and ".monitor" in parts[1]:
|
||||
return parts[1]
|
||||
|
||||
raise RuntimeError("System output monitor device not found")
|
||||
|
||||
def findInputSource():
|
||||
result = subprocess.run(
|
||||
["pactl", "list", "short", "sources"],
|
||||
stdout=subprocess.PIPE, text=True
|
||||
)
|
||||
lines = result.stdout.splitlines()
|
||||
|
||||
for line in lines:
|
||||
parts = line.split('\t')
|
||||
name = parts[1]
|
||||
if ".monitor" not in name:
|
||||
return name
|
||||
raise RuntimeError("Microphone input device not found")
|
||||
|
||||
class AudioStream:
|
||||
"""
|
||||
获取系统音频流
|
||||
|
||||
初始化参数:
|
||||
audio_type: 0-系统音频输出流(不支持,不会生效),1-系统音频输入流(默认)
|
||||
chunk_rate: 每秒采集音频块的数量,默认为20
|
||||
"""
|
||||
def __init__(self, audio_type=1, chunk_rate=20):
|
||||
self.audio_type = audio_type
|
||||
|
||||
if self.audio_type == 0:
|
||||
self.source = findMonitorSource()
|
||||
else:
|
||||
self.source = findInputSource()
|
||||
|
||||
self.process = None
|
||||
|
||||
self.SAMP_WIDTH = 2
|
||||
self.FORMAT = 16
|
||||
self.CHANNELS = 2
|
||||
self.RATE = 48000
|
||||
self.CHUNK = self.RATE // chunk_rate
|
||||
|
||||
def printInfo(self):
|
||||
dev_info = f"""
|
||||
音频捕获进程:
|
||||
- 捕获类型:{"音频输出" if self.audio_type == 0 else "音频输入"}
|
||||
- 设备源:{self.source}
|
||||
- 捕获进程PID:{self.process.pid if self.process else "None"}
|
||||
|
||||
音频样本块大小:{self.CHUNK}
|
||||
样本位宽:{self.SAMP_WIDTH}
|
||||
采样格式:{self.FORMAT}
|
||||
音频通道数:{self.CHANNELS}
|
||||
音频采样率:{self.RATE}
|
||||
"""
|
||||
print(dev_info)
|
||||
|
||||
def openStream(self):
|
||||
"""
|
||||
启动音频捕获进程
|
||||
"""
|
||||
self.process = subprocess.Popen(
|
||||
["parec", "-d", self.source, "--format=s16le", "--rate=48000", "--channels=2"],
|
||||
stdout=subprocess.PIPE
|
||||
)
|
||||
|
||||
def read_chunk(self):
|
||||
"""
|
||||
读取音频数据
|
||||
"""
|
||||
if self.process:
|
||||
return self.process.stdout.read(self.CHUNK)
|
||||
return None
|
||||
|
||||
def closeStream(self):
|
||||
"""
|
||||
关闭系统音频捕获进程
|
||||
"""
|
||||
if self.process:
|
||||
self.process.terminate()
|
||||
113
engine/sysaudio/win.py
Normal file
113
engine/sysaudio/win.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""获取 Windows 系统音频输入/输出流"""
|
||||
|
||||
import pyaudiowpatch as pyaudio
|
||||
|
||||
|
||||
def getDefaultLoopbackDevice(mic: pyaudio.PyAudio, info = True)->dict:
|
||||
"""
|
||||
获取默认的系统音频输出的回环设备
|
||||
Args:
|
||||
mic (pyaudio.PyAudio): pyaudio对象
|
||||
info (bool, optional): 是否打印设备信息
|
||||
|
||||
Returns:
|
||||
dict: 系统音频输出的回环设备
|
||||
"""
|
||||
try:
|
||||
WASAPI_info = mic.get_host_api_info_by_type(pyaudio.paWASAPI)
|
||||
except OSError:
|
||||
print("Looks like WASAPI is not available on the system. Exiting...")
|
||||
exit()
|
||||
|
||||
default_speaker = mic.get_device_info_by_index(WASAPI_info["defaultOutputDevice"])
|
||||
if(info): print("wasapi_info:\n", WASAPI_info, "\n")
|
||||
if(info): print("default_speaker:\n", default_speaker, "\n")
|
||||
|
||||
if not default_speaker["isLoopbackDevice"]:
|
||||
for loopback in mic.get_loopback_device_info_generator():
|
||||
if default_speaker["name"] in loopback["name"]:
|
||||
default_speaker = loopback
|
||||
if(info): print("Using loopback device:\n", default_speaker, "\n")
|
||||
break
|
||||
else:
|
||||
print("Default loopback output device not found.")
|
||||
print("Run `python -m pyaudiowpatch` to check available devices.")
|
||||
print("Exiting...")
|
||||
exit()
|
||||
|
||||
if(info): print(f"Output Stream Device: #{default_speaker['index']} {default_speaker['name']}")
|
||||
return default_speaker
|
||||
|
||||
|
||||
class AudioStream:
|
||||
"""
|
||||
获取系统音频流
|
||||
|
||||
初始化参数:
|
||||
audio_type: 0-系统音频输出流(默认),1-系统音频输入流
|
||||
chunk_rate: 每秒采集音频块的数量,默认为20
|
||||
"""
|
||||
def __init__(self, audio_type=0, chunk_rate=20):
|
||||
self.audio_type = audio_type
|
||||
self.mic = pyaudio.PyAudio()
|
||||
if self.audio_type == 0:
|
||||
self.device = getDefaultLoopbackDevice(self.mic, False)
|
||||
else:
|
||||
self.device = self.mic.get_default_input_device_info()
|
||||
self.stream = None
|
||||
self.SAMP_WIDTH = pyaudio.get_sample_size(pyaudio.paInt16)
|
||||
self.FORMAT = pyaudio.paInt16
|
||||
self.CHANNELS = int(self.device["maxInputChannels"])
|
||||
self.RATE = int(self.device["defaultSampleRate"])
|
||||
self.CHUNK = self.RATE // chunk_rate
|
||||
self.INDEX = self.device["index"]
|
||||
|
||||
def printInfo(self):
|
||||
dev_info = f"""
|
||||
采样设备:
|
||||
- 设备类型:{ "音频输出" if self.audio_type == 0 else "音频输入" }
|
||||
- 序号:{self.device['index']}
|
||||
- 名称:{self.device['name']}
|
||||
- 最大输入通道数:{self.device['maxInputChannels']}
|
||||
- 默认低输入延迟:{self.device['defaultLowInputLatency']}s
|
||||
- 默认高输入延迟:{self.device['defaultHighInputLatency']}s
|
||||
- 默认采样率:{self.device['defaultSampleRate']}Hz
|
||||
- 是否回环设备:{self.device['isLoopbackDevice']}
|
||||
|
||||
音频样本块大小:{self.CHUNK}
|
||||
样本位宽:{self.SAMP_WIDTH}
|
||||
采样格式:{self.FORMAT}
|
||||
音频通道数:{self.CHANNELS}
|
||||
音频采样率:{self.RATE}
|
||||
"""
|
||||
print(dev_info)
|
||||
|
||||
def openStream(self):
|
||||
"""
|
||||
打开并返回系统音频输出流
|
||||
"""
|
||||
if self.stream: return self.stream
|
||||
self.stream = self.mic.open(
|
||||
format = self.FORMAT,
|
||||
channels = self.CHANNELS,
|
||||
rate = self.RATE,
|
||||
input = True,
|
||||
input_device_index = self.INDEX
|
||||
)
|
||||
return self.stream
|
||||
|
||||
def read_chunk(self):
|
||||
"""
|
||||
读取音频数据
|
||||
"""
|
||||
if not self.stream: return None
|
||||
return self.stream.read(self.CHUNK, exception_on_overflow=False)
|
||||
|
||||
def closeStream(self):
|
||||
"""
|
||||
关闭系统音频输出流
|
||||
"""
|
||||
if self.stream is None: return
|
||||
self.stream.stop_stream()
|
||||
self.stream.close()
|
||||
self.stream = None
|
||||
Reference in New Issue
Block a user