修改方案,优先考虑 python 实现

This commit is contained in:
himeditator
2025-06-13 19:54:00 +08:00
parent d6ae8d51dd
commit f10530eb67
7 changed files with 170 additions and 0 deletions

1
.gitignore vendored
View File

@@ -4,3 +4,4 @@ out
.DS_Store
.eslintcache
*.log*
__pycache__/

View File

@@ -0,0 +1,8 @@
这是项目的 python 实现。使用 Tkinter 创建 GUI。
拟实现功能:
- [ ] 可以获取 Windows 系统音频流
- [ ] 可以获取 Linux 系统视频流
- [ ] 添加字幕图形界面
- [ ] 界面中可以实时显示当前系统音频对应的字幕

View File

@@ -0,0 +1,67 @@
from dashscope.audio.asr import \
TranslationRecognizerCallback, \
TranscriptionResult, \
TranslationResult, \
TranslationRecognizerRealtime
class Callback(TranslationRecognizerCallback):
"""
语音大模型流式传输回调对象
"""
def __init__(self):
super().__init__()
self.usage = 0
self.sentences = []
self.translations = []
def on_open(self) -> None:
print("\nGummy 流式翻译开始...\n")
def on_close(self) -> None:
print(f"\nTokens消耗{self.usage}")
print(f"流式翻译结束...\n")
for i in range(len(self.sentences)):
print(f"\n{self.sentences[i]}\n{self.translations[i]}\n")
def on_event(
self,
request_id,
transcription_result: TranscriptionResult,
translation_result: TranslationResult,
usage
) -> None:
if transcription_result is not None:
id = transcription_result.sentence_id
text = transcription_result.text
if transcription_result.stash is not None:
stash = transcription_result.stash.text
else:
stash = ""
print(f"#{id}: {text}{stash}")
if usage: self.sentences.append(text)
if translation_result is not None:
lang = translation_result.get_language_list()[0]
text = translation_result.get_translation(lang).text
if translation_result.get_translation(lang).stash is not None:
stash = translation_result.get_translation(lang).stash.text
else:
stash = ""
print(f"#{lang}: {text}{stash}")
if usage: self.translations.append(text)
if usage: self.usage += usage['duration']
def getGummpyTranslator(rate) -> TranslationRecognizerRealtime:
translator = TranslationRecognizerRealtime(
model = "gummy-realtime-v1",
format = "pcm",
sample_rate = rate,
transcription_enabled = True,
translation_enabled = True,
source_language = "ja",
translation_target_languages = ["zh"],
callback = Callback()
)
return translator

41
python-subprocess/main.py Normal file
View File

@@ -0,0 +1,41 @@
from sysaudio.win import getDefaultLoopbackDevice
from audio2text.gummy import getGummpyTranslator
import pyaudiowpatch as pyaudio
import numpy as np
mic = pyaudio.PyAudio()
loopback = getDefaultLoopbackDevice(mic)
SAMP_WIDTH = pyaudio.get_sample_size(pyaudio.paInt16)
FORMAT = pyaudio.paInt16
CHANNELS = loopback["maxInputChannels"]
RATE = int(loopback["defaultSampleRate"])
CHUNK = RATE // 10
INDEX = loopback["index"]
RECORD_SECONDS = 20 # 监听时长(s)
stream = mic.open(
format = FORMAT,
channels = CHANNELS,
rate = RATE,
input = True,
input_device_index = INDEX
)
translator = getGummpyTranslator(rate=RATE)
translator.start()
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK)
data_np = np.frombuffer(data, dtype=np.int16)
data_np_r = data_np.reshape(-1, CHANNELS)
mono_data = np.mean(data_np_r.astype(np.float32), axis=1)
mono_data = mono_data.astype(np.int16)
mono_data_bytes = mono_data.tobytes()
translator.send_audio_frame(mono_data_bytes)
translator.stop()
stream.stop_stream()
stream.close()

View File

@@ -0,0 +1,53 @@
"""获取 Windows 系统音频输出流"""
import pyaudiowpatch as pyaudio
def getDefaultLoopbackDevice(mic: pyaudio.PyAudio, info = True)->dict:
"""
获取默认的系统音频输出的回环设备
Args:
mic (pyaudio.PyAudio): pyaudio对象
info (bool, optional): 是否打印设备信息
Returns:
dict: 系统音频输出的回环设备
"""
try:
WASAPI_info = mic.get_host_api_info_by_type(pyaudio.paWASAPI)
except OSError:
print("Looks like WASAPI is not available on the system. Exiting...")
exit()
default_speaker = mic.get_device_info_by_index(WASAPI_info["defaultOutputDevice"])
if(info): print("wasapi_info:\n", WASAPI_info, "\n")
if(info): print("default_speaker:\n", default_speaker, "\n")
if not default_speaker["isLoopbackDevice"]:
for loopback in mic.get_loopback_device_info_generator():
if default_speaker["name"] in loopback["name"]:
default_speaker = loopback
if(info): print("Using loopback device:\n", default_speaker, "\n")
break
else:
print("Default loopback output device not found.")
print("Run `python -m pyaudiowpatch` to check available devices.")
print("Exiting...")
exit()
if(info): print(f"Output Stream Device: #{default_speaker['index']} {default_speaker['name']}")
return default_speaker
def getOutputStream():
mic = pyaudio.PyAudio()
default_speaker = getDefaultLoopbackDevice(mic, False)
stream = mic.open(
format = pyaudio.paInt16,
channels = default_speaker["maxInputChannels"],
rate = int(default_speaker["defaultSampleRate"]),
input = True,
input_device_index = default_speaker["index"]
)
return stream