refactor(项目): 尝试 Python 语音识别和内容发送

This commit is contained in:
himeditator
2025-06-17 21:26:16 +08:00
parent 1e83ad2199
commit d1bee65ae1
11 changed files with 158 additions and 357 deletions

View File

@@ -11,10 +11,6 @@
"output_type": "stream",
"text": [
"Received: {\"message\":\"Electron Initialized\"}\n",
"Received: {\"command\":\"process_data\",\"payload\":{\"some\":\"data\"}}\n",
"Client disconnected\n",
"Received: {\"message\":\"Electron Initialized\"}\n",
"Received: {\"command\":\"process_data\",\"payload\":{\"some\":\"data\"}}\n",
"Client disconnected\n"
]
}
@@ -23,6 +19,7 @@
"import asyncio\n",
"import websockets\n",
"import nest_asyncio\n",
"import json\n",
"\n",
"# 应用补丁,允许在 Jupyter 中运行嵌套事件循环\n",
"nest_asyncio.apply()\n",
@@ -31,7 +28,7 @@
" try:\n",
" async for message in websocket:\n",
" print(f\"Received: {message}\")\n",
" await websocket.send(f\"Echo: {message}\")\n",
" await websocket.send(json.dumps({\"message\": \"Hello from server!\"}))\n",
" except websockets.exceptions.ConnectionClosed:\n",
" print(\"Client disconnected\")\n",
"\n",

View File

@@ -1,21 +0,0 @@
import asyncio
import websockets
import json # 导入 json 模块
# WebSocket 服务器处理函数
async def echo(websocket):
async for message in websocket:
print(f"收到客户端消息: {message}")
# 发送响应给客户端
response = {"respond": "Hello, Client!"}
await websocket.send(json.dumps(response)) # 将字典转换为 JSON 字符串
print(f"已发送响应: {response}")
# 启动服务器
async def main():
async with websockets.serve(echo, "localhost", 8765):
await asyncio.Future() # 保持服务器运行
if __name__ == "__main__":
print("WebSocket 服务器已启动,监听 ws://localhost:8765")
asyncio.run(main())

View File

@@ -1,12 +0,0 @@
这是项目的 python 实现。使用 Tkinter 创建 GUI。
拟实现功能:
- [x] 可以获取 Windows 系统音频流
- [ ] 可以对音频流进行转换(调整声道数和采样率)
- [ ] 可以获取 Linux 系统视频流
- [ ] 添加字幕图形界面
- [ ] 字幕显示
- [ ] 字幕样式设置
- [ ] 字幕页面删除标题栏
- [ ] 界面中实时显示当前系统音频对应的字幕

View File

@@ -1,223 +0,0 @@
import pyaudiowpatch as pyaudio
import numpy as np
import tkinter as tk
from tkinter import ttk
from dashscope.audio.asr import (
TranslationRecognizerCallback,
TranslationRecognizerRealtime
)
import threading
import queue
class AudioCapture:
def __init__(self):
self.audio = pyaudio.PyAudio()
self.stream = None
self.is_running = False
self.setup_audio()
def setup_audio(self):
try:
wasapi_info = self.audio.get_host_api_info_by_type(pyaudio.paWASAPI)
except OSError:
raise Exception("WASAPI 不可用")
default_speaker = self.audio.get_device_info_by_index(wasapi_info["defaultOutputDevice"])
if not default_speaker["isLoopbackDevice"]:
for loopback in self.audio.get_loopback_device_info_generator():
if default_speaker["name"] in loopback["name"]:
default_speaker = loopback
break
else:
raise Exception("未找到默认回环输出设备")
self.device_info = default_speaker
self.channels = default_speaker["maxInputChannels"]
self.rate = int(default_speaker["defaultSampleRate"])
self.chunk = self.rate // 10
def start_stream(self):
self.stream = self.audio.open(
format=pyaudio.paInt16,
channels=self.channels,
rate=self.rate,
input=True,
input_device_index=self.device_info["index"]
)
self.is_running = True
def stop_stream(self):
if self.stream:
self.is_running = False
self.stream.stop_stream()
self.stream.close()
self.audio.terminate()
class CaptionCallback(TranslationRecognizerCallback):
def __init__(self, text_queue):
super().__init__()
self.text_queue = text_queue
self.usage = 0
def on_open(self) -> None:
self.text_queue.put(("status", "开始识别..."))
def on_close(self) -> None:
self.text_queue.put(("status", f"识别结束,消耗 Tokens: {self.usage}"))
def on_event(self, request_id, transcription_result, translation_result, usage) -> None:
if transcription_result is not None:
text = transcription_result.text
if transcription_result.stash is not None:
text += transcription_result.stash.text
self.text_queue.put(("caption", text))
if translation_result is not None:
lang = translation_result.get_language_list()[0]
text = translation_result.get_translation(lang).text
if translation_result.get_translation(lang).stash is not None:
text += translation_result.get_translation(lang).stash.text
self.text_queue.put(("translation", text))
if usage:
self.usage += usage['duration']
class CaptionApp:
def __init__(self):
self.root = tk.Tk()
self.root.title("实时字幕")
self.root.geometry("800x400")
self.setup_ui()
self.text_queue = queue.Queue()
self.audio_capture = AudioCapture()
self.translator = None
self.is_running = False
# 添加字幕缓存
self.caption_cache = []
self.translation_cache = []
def setup_ui(self):
# 状态标签
self.status_label = ttk.Label(self.root, text="就绪")
self.status_label.pack(pady=5)
# 字幕显示区域
self.caption_frame = ttk.Frame(self.root)
self.caption_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
# 创建两个标签用于显示字幕和翻译
self.caption_label1 = ttk.Label(self.caption_frame, text="", font=("Arial", 14))
self.caption_label1.pack(fill=tk.X, pady=5)
self.translation_label1 = ttk.Label(self.caption_frame, text="", font=("Arial", 14))
self.translation_label1.pack(fill=tk.X, pady=5)
self.caption_label2 = ttk.Label(self.caption_frame, text="", font=("Arial", 14))
self.caption_label2.pack(fill=tk.X, pady=5)
self.translation_label2 = ttk.Label(self.caption_frame, text="", font=("Arial", 14))
self.translation_label2.pack(fill=tk.X, pady=5)
# 控制按钮
self.control_frame = ttk.Frame(self.root)
self.control_frame.pack(pady=10)
self.start_button = ttk.Button(self.control_frame, text="开始", command=self.start_caption)
self.start_button.pack(side=tk.LEFT, padx=5)
self.stop_button = ttk.Button(self.control_frame, text="停止", command=self.stop_caption, state=tk.DISABLED)
self.stop_button.pack(side=tk.LEFT, padx=5)
def start_caption(self):
self.is_running = True
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
# 初始化翻译器
self.translator = TranslationRecognizerRealtime(
model="gummy-realtime-v1",
format="pcm",
sample_rate=self.audio_capture.rate,
transcription_enabled=True,
translation_enabled=True,
source_language="ja",
translation_target_languages=["zh"],
callback=CaptionCallback(self.text_queue)
)
# 启动音频捕获和翻译
self.audio_capture.start_stream()
self.translator.start()
# 启动处理线程
threading.Thread(target=self.process_audio, daemon=True).start()
threading.Thread(target=self.update_ui, daemon=True).start()
def stop_caption(self):
self.is_running = False
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
if self.translator:
self.translator.stop()
self.audio_capture.stop_stream()
def process_audio(self):
while self.is_running:
try:
data = self.audio_capture.stream.read(self.audio_capture.chunk)
data_np = np.frombuffer(data, dtype=np.int16)
data_np_r = data_np.reshape(-1, self.audio_capture.channels)
mono_data = np.mean(data_np_r.astype(np.float32), axis=1)
mono_data = mono_data.astype(np.int16)
mono_data_bytes = mono_data.tobytes()
self.translator.send_audio_frame(mono_data_bytes)
except Exception as e:
self.text_queue.put(("error", str(e)))
break
def update_caption_display(self):
# 更新字幕显示
if len(self.caption_cache) > 0:
self.caption_label1.config(text=self.caption_cache[-1])
if len(self.caption_cache) > 1:
self.caption_label2.config(text=self.caption_cache[-2])
else:
self.caption_label2.config(text="")
# 更新翻译显示
if len(self.translation_cache) > 0:
self.translation_label1.config(text=f"翻译: {self.translation_cache[-1]}")
if len(self.translation_cache) > 1:
self.translation_label2.config(text=f"翻译: {self.translation_cache[-2]}")
else:
self.translation_label2.config(text="")
def update_ui(self):
while self.is_running:
try:
msg_type, text = self.text_queue.get(timeout=0.1)
if msg_type == "status":
self.status_label.config(text=text)
elif msg_type == "caption":
self.caption_cache.append(text)
if len(self.caption_cache) > 2:
self.caption_cache.pop(0)
self.update_caption_display()
elif msg_type == "translation":
self.translation_cache.append(text)
if len(self.translation_cache) > 2:
self.translation_cache.pop(0)
self.update_caption_display()
elif msg_type == "error":
self.status_label.config(text=f"错误: {text}")
self.stop_caption()
except queue.Empty:
continue
def run(self):
self.root.mainloop()
if __name__ == "__main__":
app = CaptionApp()
app.run()

View File

@@ -4,6 +4,7 @@ from dashscope.audio.asr import (
TranslationResult,
TranslationRecognizerRealtime
)
from datetime import datetime
class Callback(TranslationRecognizerCallback):
"""
@@ -12,17 +13,15 @@ class Callback(TranslationRecognizerCallback):
def __init__(self):
super().__init__()
self.usage = 0
self.sentences = []
self.translations = []
self.cur_id = -1
self.time_str = ''
def on_open(self) -> None:
print("\nGummy 流式翻译开始...\n")
print("INFO gummy translation start...")
def on_close(self) -> None:
print(f"\nTokens消耗{self.usage}")
print(f"流式翻译结束...\n")
for i in range(len(self.sentences)):
print(f"\n{self.sentences[i]}\n{self.translations[i]}\n")
print(f"INFO tokens useage: {self.usage}")
print(f"INFO translation end...")
def on_event(
self,
@@ -31,38 +30,37 @@ class Callback(TranslationRecognizerCallback):
translation_result: TranslationResult,
usage
) -> None:
caption = {}
if transcription_result is not None:
id = transcription_result.sentence_id
text = transcription_result.text
if transcription_result.stash is not None:
stash = transcription_result.stash.text
caption['id'] = transcription_result.sentence_id
caption['text'] = transcription_result.text
if caption['id'] != self.cur_id:
self.cur_id = caption['id']
cur_time = datetime.now().strftime('%H:%M:%S')
caption['time_s'] = cur_time
self.time_str = cur_time
else:
stash = ""
print(f"#{id}: {text}{stash}")
if usage: self.sentences.append(text)
caption['time_s'] = self.time_str
caption['time_t'] = datetime.now().strftime('%H:%M:%S')
caption['translation'] = ""
if translation_result is not None:
lang = translation_result.get_language_list()[0]
text = translation_result.get_translation(lang).text
if translation_result.get_translation(lang).stash is not None:
stash = translation_result.get_translation(lang).stash.text
else:
stash = ""
print(f"#{lang}: {text}{stash}")
if usage: self.translations.append(text)
caption['translation'] = translation_result.get_translation(lang).text
if usage: self.usage += usage['duration']
if usage:
self.usage += usage['duration']
print(caption)
class GummyTranslator:
def __init__(self, rate, source, target):
self.translator = TranslationRecognizerRealtime(
model = "gummy-realtime-v1",
format = "pcm",
sample_rate = rate,
transcription_enabled = True,
translation_enabled = True,
source_language = source,
translation_target_languages = [target],
callback = Callback()
)
model = "gummy-realtime-v1",
format = "pcm",
sample_rate = rate,
transcription_enabled = True,
translation_enabled = (target is not None),
source_language = source,
translation_target_languages = [target],
callback = Callback()
)

View File

@@ -1,17 +1,23 @@
import asyncio
from sysaudio.win import LoopbackStream, mergeStreamChannels
from audio2text.gummy import GummyTranslator
loopback = LoopbackStream()
loopback.openStream()
def main():
loopback = LoopbackStream()
loopback.openStream()
gummy = GummyTranslator(loopback.RATE, "ja", "zh")
gummy.translator.start()
gummy = GummyTranslator(loopback.RATE, "zh", "en")
gummy.translator.start()
for i in range(0, 100):
if not loopback.stream: continue
data = loopback.stream.read(loopback.CHUNK)
data = mergeStreamChannels(data, loopback.CHANNELS)
gummy.translator.send_audio_frame(data)
try:
for _ in range(0, 400):
if not loopback.stream: continue
data = loopback.stream.read(loopback.CHUNK)
data = mergeStreamChannels(data, loopback.CHANNELS)
gummy.translator.send_audio_frame(data)
finally:
gummy.translator.stop()
loopback.closeStream()
gummy.translator.stop()
loopback.closeStream()
if __name__ == "__main__":
main()

View File

@@ -70,7 +70,7 @@ class LoopbackStream:
self.FORMAT = pyaudio.paInt16
self.CHANNELS = self.loopback["maxInputChannels"]
self.RATE = int(self.loopback["defaultSampleRate"])
self.CHUNK = self.RATE // 10
self.CHUNK = self.RATE // 20
self.INDEX = self.loopback["index"]
def printInfo(self):

View File

@@ -0,0 +1,61 @@
import json
import websockets
class WebSocketServer:
def __init__(self):
self.server = None
self.websocket = None
async def start(self, port=8765):
"""启动 WebSocket 服务器"""
self.server = await websockets.serve(self.handle_client, "localhost", port)
print(f"INFO websocket server started on ws://localhost:{port}")
async def stop(self):
"""关闭 WebSocket 服务器"""
if self.server:
try:
if self.websocket:
await self.close()
self.server.close()
await self.server.wait_closed()
print("INFO server closed successfully")
except Exception as e:
print(f"ERROR failed to close server: {e}")
finally:
self.server = None
async def handle_client(self, websocket, path="/"):
"""处理客户端连接"""
try:
self.websocket = websocket
async for message in websocket:
print(f"INFO received: {message}")
except websockets.exceptions.ConnectionClosed:
print("INFO client disconnected")
self.websocket = None
async def send(self, data):
"""向连接的客户端发送数据"""
if self.websocket:
try:
await self.websocket.send(json.dumps(data))
print(f"INFO sent: {data}")
return True
except websockets.exceptions.ConnectionClosed:
print("ERROR: Client disconnected while sending data")
self.websocket = None
return False
return False
async def close(self):
"""安全地断开WebSocket连接"""
if self.websocket:
try:
await self.websocket.close()
print("INFO connection closed successfully")
except Exception as e:
print(f"ERROR failed to close connection: {e}")
finally:
self.websocket = None

View File

@@ -2,6 +2,9 @@ import { app, BrowserWindow } from 'electron'
import { electronApp, optimizer } from '@electron-toolkit/utils'
import { controlWindow } from './control'
import { captionWindow } from './caption'
import { WebSocketConnector } from './wsConnector'
const wsConnector = new WebSocketConnector()
app.whenReady().then(() => {
electronApp.setAppUserModelId('com.himeditator.autocaption')
@@ -15,6 +18,8 @@ app.whenReady().then(() => {
controlWindow.createWindow()
wsConnector.connect()
app.on('activate', function () {
if (BrowserWindow.getAllWindows().length === 0){
controlWindow.createWindow()

View File

@@ -1,52 +0,0 @@
import WebSocket from 'ws';
export class PythonConnector {
ws: WebSocket | null;
constructor() {
this.ws = null;
this.connect();
}
connect() {
this.ws = new WebSocket('ws://localhost:8765');
this.ws.on('open', () => {
console.log('Python server connected');
this.send({ message: 'Electron Initialized' });
});
this.ws.on('message', (data) => {
const message = JSON.parse(data.toString());
console.log('Get message from Python:', message);
// 在这里处理来自 Python 的消息
if (message.notification) {
this.handleNotification(message.notification);
}
});
this.ws.on('close', () => {
console.log('Connection closed. Reconnecting...');
setTimeout(() => this.connect(), 3000);
});
this.ws.on('error', (error) => {
console.error('WebSocket Error:', error);
});
}
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
} else {
console.error('WebSocket not connected');
}
}
handleNotification(notification) {
// 处理 Python 主动推送的通知
console.log('Handel notification:', notification);
// 可以在这里更新 UI 或触发其他操作
}
}

42
src/main/wsConnector.ts Normal file
View File

@@ -0,0 +1,42 @@
import WebSocket from 'ws';
export class WebSocketConnector {
ws: WebSocket | null;
constructor() {
this.ws = null;
}
connect() {
this.ws = new WebSocket('ws://localhost:8765');
this.ws.on('open', () => {
console.log('INFO websocket server connected');
this.send({ message: 'Electron Initialized' });
});
this.ws.on('message', this.handleMessage);
this.ws.on('close', () => {
console.log('INFO websocket connection closed');
});
this.ws.on('error', (error) => {
console.error('ERROR websocket error:', error);
});
}
handleMessage(data: any) {
const message = JSON.parse(data.toString());
console.log('INFO get message from webscoket:', message);
}
send(data: object) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
} else {
console.error('ERROR send error: websocket not connected');
}
}
}