feat: 新增角色功能与独立视频模型时长。fix: 修复非流测试输出的问题

closes #1
This commit is contained in:
TheSmallHanCat
2025-11-16 11:04:16 +08:00
parent b6cedb0ece
commit 42b8311450
14 changed files with 1301 additions and 400 deletions

View File

@@ -3,6 +3,8 @@ import json
import asyncio
import base64
import time
import random
import re
from typing import Optional, AsyncGenerator, Dict, Any
from datetime import datetime
from .sora_client import SoraClient
@@ -31,17 +33,37 @@ MODEL_CONFIG = {
"width": 360,
"height": 540
},
"sora-video": {
# Video models with 10s duration (300 frames)
"sora-video-10s": {
"type": "video",
"orientation": "landscape"
"orientation": "landscape",
"n_frames": 300
},
"sora-video-landscape": {
"sora-video-landscape-10s": {
"type": "video",
"orientation": "landscape"
"orientation": "landscape",
"n_frames": 300
},
"sora-video-portrait": {
"sora-video-portrait-10s": {
"type": "video",
"orientation": "portrait"
"orientation": "portrait",
"n_frames": 300
},
# Video models with 15s duration (450 frames)
"sora-video-15s": {
"type": "video",
"orientation": "landscape",
"n_frames": 450
},
"sora-video-landscape-15s": {
"type": "video",
"orientation": "landscape",
"n_frames": 450
},
"sora-video-portrait-15s": {
"type": "video",
"orientation": "portrait",
"n_frames": 450
}
}
@@ -77,11 +99,128 @@ class GenerationHandler:
if "," in image_str:
image_str = image_str.split(",", 1)[1]
return base64.b64decode(image_str)
def _decode_base64_video(self, video_str: str) -> bytes:
"""Decode base64 video"""
# Remove data URI prefix if present
if "," in video_str:
video_str = video_str.split(",", 1)[1]
return base64.b64decode(video_str)
def _process_character_username(self, username_hint: str) -> str:
"""Process character username from API response
Logic:
1. Remove prefix (e.g., "blackwill." from "blackwill.meowliusma68")
2. Keep the remaining part (e.g., "meowliusma68")
3. Append 3 random digits
4. Return final username (e.g., "meowliusma68123")
Args:
username_hint: Original username from API (e.g., "blackwill.meowliusma68")
Returns:
Processed username with 3 random digits appended
"""
# Split by dot and take the last part
if "." in username_hint:
base_username = username_hint.split(".")[-1]
else:
base_username = username_hint
# Generate 3 random digits
random_digits = str(random.randint(100, 999))
# Return final username
final_username = f"{base_username}{random_digits}"
debug_logger.log_info(f"Processed username: {username_hint} -> {final_username}")
return final_username
def _clean_remix_link_from_prompt(self, prompt: str) -> str:
"""Remove remix link from prompt
Removes both formats:
1. Full URL: https://sora.chatgpt.com/p/s_68e3a06dcd888191b150971da152c1f5
2. Short ID: s_68e3a06dcd888191b150971da152c1f5
Args:
prompt: Original prompt that may contain remix link
Returns:
Cleaned prompt without remix link
"""
if not prompt:
return prompt
# Remove full URL format: https://sora.chatgpt.com/p/s_[a-f0-9]{32}
cleaned = re.sub(r'https://sora\.chatgpt\.com/p/s_[a-f0-9]{32}', '', prompt)
# Remove short ID format: s_[a-f0-9]{32}
cleaned = re.sub(r's_[a-f0-9]{32}', '', cleaned)
# Clean up extra whitespace
cleaned = ' '.join(cleaned.split())
debug_logger.log_info(f"Cleaned prompt: '{prompt}' -> '{cleaned}'")
return cleaned
async def _download_file(self, url: str) -> bytes:
"""Download file from URL
Args:
url: File URL
Returns:
File bytes
"""
from curl_cffi.requests import AsyncSession
proxy_url = await self.load_balancer.proxy_manager.get_proxy_url()
kwargs = {
"timeout": 30,
"impersonate": "chrome"
}
if proxy_url:
kwargs["proxy"] = proxy_url
async with AsyncSession() as session:
response = await session.get(url, **kwargs)
if response.status_code != 200:
raise Exception(f"Failed to download file: {response.status_code}")
return response.content
async def check_token_availability(self, is_image: bool, is_video: bool) -> bool:
"""Check if tokens are available for the given model type
Args:
is_image: Whether checking for image generation
is_video: Whether checking for video generation
Returns:
True if available tokens exist, False otherwise
"""
token_obj = await self.load_balancer.select_token(for_image_generation=is_image, for_video_generation=is_video)
return token_obj is not None
async def handle_generation(self, model: str, prompt: str,
image: Optional[str] = None,
video: Optional[str] = None,
remix_target_id: Optional[str] = None,
stream: bool = True) -> AsyncGenerator[str, None]:
"""Handle generation request"""
"""Handle generation request
Args:
model: Model name
prompt: Generation prompt
image: Base64 encoded image
video: Base64 encoded video or video URL
remix_target_id: Sora share link video ID for remix
stream: Whether to stream response
"""
start_time = time.time()
# Validate model
@@ -92,6 +231,48 @@ class GenerationHandler:
is_video = model_config["type"] == "video"
is_image = model_config["type"] == "image"
# Non-streaming mode: only check availability
if not stream:
available = await self.check_token_availability(is_image, is_video)
if available:
if is_image:
message = "All tokens available for image generation. Please enable streaming to use the generation feature."
else:
message = "All tokens available for video generation. Please enable streaming to use the generation feature."
else:
if is_image:
message = "No available models for image generation"
else:
message = "No available models for video generation"
yield self._format_non_stream_response(message, is_availability_check=True)
return
# Handle character creation and remix flows for video models
if is_video:
# Remix flow: remix_target_id provided
if remix_target_id:
async for chunk in self._handle_remix(remix_target_id, prompt, model_config):
yield chunk
return
# Character creation flow: video provided
if video:
# Decode video if it's base64
video_data = self._decode_base64_video(video) if video.startswith("data:") or not video.startswith("http") else video
# If no prompt, just create character and return
if not prompt:
async for chunk in self._handle_character_creation_only(video_data, model_config):
yield chunk
return
else:
# If prompt provided, create character and generate video
async for chunk in self._handle_character_and_video_generation(video_data, prompt, model_config):
yield chunk
return
# Streaming mode: proceed with actual generation
# Select token (with lock for image generation, Sora2 quota check for video generation)
token_obj = await self.load_balancer.select_token(for_image_generation=is_image, for_video_generation=is_video)
if not token_obj:
@@ -142,10 +323,8 @@ class GenerationHandler:
)
if is_video:
# Get n_frames from database configuration
# Default to "10s" (300 frames) if not specified
video_length_config = await self.db.get_video_length_config()
n_frames = await self.db.get_n_frames_for_length(video_length_config.default_length)
# Get n_frames from model configuration
n_frames = model_config.get("n_frames", 300) # Default to 300 frames (10s)
task_id = await self.sora_client.generate_video(
prompt, token_obj.token,
@@ -476,8 +655,6 @@ class GenerationHandler:
finish_reason="STOP"
)
yield "data: [DONE]\n\n"
else:
yield self._format_non_stream_response(local_url, "video")
return
else:
result = await self.sora_client.get_image_tasks(token)
@@ -550,8 +727,6 @@ class GenerationHandler:
finish_reason="STOP"
)
yield "data: [DONE]\n\n"
else:
yield self._format_non_stream_response(local_urls[0], "image")
return
elif status == "failed":
@@ -666,12 +841,20 @@ class GenerationHandler:
return f'data: {json.dumps(response)}\n\n'
def _format_non_stream_response(self, url: str, media_type: str) -> str:
"""Format non-streaming response"""
if media_type == "video":
content = f"```html\n<video src='{url}' controls></video>\n```"
else:
content = f"![Generated Image]({url})"
def _format_non_stream_response(self, content: str, media_type: str = None, is_availability_check: bool = False) -> str:
"""Format non-streaming response
Args:
content: Response content (either URL for generation or message for availability check)
media_type: Type of media ("video", "image") - only used for generation responses
is_availability_check: Whether this is an availability check response
"""
if not is_availability_check:
# Generation response with media
if media_type == "video":
content = f"```html\n<video src='{content}' controls></video>\n```"
else:
content = f"![Generated Image]({content})"
response = {
"id": f"chatcmpl-{datetime.now().timestamp()}",
@@ -706,3 +889,429 @@ class GenerationHandler:
except Exception as e:
# Don't fail the request if logging fails
print(f"Failed to log request: {e}")
# ==================== Character Creation and Remix Handlers ====================
async def _handle_character_creation_only(self, video_data, model_config: Dict) -> AsyncGenerator[str, None]:
"""Handle character creation only (no video generation)
Flow:
1. Download video if URL, or use bytes directly
2. Upload video to create character
3. Poll for character processing
4. Download and cache avatar
5. Upload avatar
6. Finalize character
7. Set character as public
8. Return success message
"""
token_obj = await self.load_balancer.select_token(for_video_generation=True)
if not token_obj:
raise Exception("No available tokens for character creation")
try:
yield self._format_stream_chunk(
reasoning_content="**Character Creation Begins**\n\nInitializing character creation...\n",
is_first=True
)
# Handle video URL or bytes
if isinstance(video_data, str):
# It's a URL, download it
yield self._format_stream_chunk(
reasoning_content="Downloading video file...\n"
)
video_bytes = await self._download_file(video_data)
else:
video_bytes = video_data
# Step 1: Upload video
yield self._format_stream_chunk(
reasoning_content="Uploading video file...\n"
)
cameo_id = await self.sora_client.upload_character_video(video_bytes, token_obj.token)
debug_logger.log_info(f"Video uploaded, cameo_id: {cameo_id}")
# Step 2: Poll for character processing
yield self._format_stream_chunk(
reasoning_content="Processing video to extract character...\n"
)
cameo_status = await self._poll_cameo_status(cameo_id, token_obj.token)
debug_logger.log_info(f"Cameo status: {cameo_status}")
# Extract character info immediately after polling completes
username_hint = cameo_status.get("username_hint", "character")
display_name = cameo_status.get("display_name_hint", "Character")
# Process username: remove prefix and add 3 random digits
username = self._process_character_username(username_hint)
# Output character name immediately
yield self._format_stream_chunk(
reasoning_content=f"✨ 角色已识别: {display_name} (@{username})\n"
)
# Step 3: Download and cache avatar
yield self._format_stream_chunk(
reasoning_content="Downloading character avatar...\n"
)
profile_asset_url = cameo_status.get("profile_asset_url")
if not profile_asset_url:
raise Exception("Profile asset URL not found in cameo status")
avatar_data = await self.sora_client.download_character_image(profile_asset_url)
debug_logger.log_info(f"Avatar downloaded, size: {len(avatar_data)} bytes")
# Step 4: Upload avatar
yield self._format_stream_chunk(
reasoning_content="Uploading character avatar...\n"
)
asset_pointer = await self.sora_client.upload_character_image(avatar_data, token_obj.token)
debug_logger.log_info(f"Avatar uploaded, asset_pointer: {asset_pointer}")
# Step 5: Finalize character
yield self._format_stream_chunk(
reasoning_content="Finalizing character creation...\n"
)
# instruction_set_hint is a string, but instruction_set in cameo_status might be an array
instruction_set = cameo_status.get("instruction_set_hint") or cameo_status.get("instruction_set")
character_id = await self.sora_client.finalize_character(
cameo_id=cameo_id,
username=username,
display_name=display_name,
profile_asset_pointer=asset_pointer,
instruction_set=instruction_set,
token=token_obj.token
)
debug_logger.log_info(f"Character finalized, character_id: {character_id}")
# Step 6: Set character as public
yield self._format_stream_chunk(
reasoning_content="Setting character as public...\n"
)
await self.sora_client.set_character_public(cameo_id, token_obj.token)
debug_logger.log_info(f"Character set as public")
# Step 7: Return success message
yield self._format_stream_chunk(
content=f"角色创建成功,角色名@{username}",
finish_reason="STOP"
)
yield "data: [DONE]\n\n"
except Exception as e:
debug_logger.log_error(
error_message=f"Character creation failed: {str(e)}",
status_code=500,
response_text=str(e)
)
raise
async def _handle_character_and_video_generation(self, video_data, prompt: str, model_config: Dict) -> AsyncGenerator[str, None]:
"""Handle character creation and video generation
Flow:
1. Download video if URL, or use bytes directly
2. Upload video to create character
3. Poll for character processing
4. Download and cache avatar
5. Upload avatar
6. Finalize character
7. Generate video with character (@username + prompt)
8. Delete character
9. Return video result
"""
token_obj = await self.load_balancer.select_token(for_video_generation=True)
if not token_obj:
raise Exception("No available tokens for video generation")
character_id = None
try:
yield self._format_stream_chunk(
reasoning_content="**Character Creation and Video Generation Begins**\n\nInitializing...\n",
is_first=True
)
# Handle video URL or bytes
if isinstance(video_data, str):
# It's a URL, download it
yield self._format_stream_chunk(
reasoning_content="Downloading video file...\n"
)
video_bytes = await self._download_file(video_data)
else:
video_bytes = video_data
# Step 1: Upload video
yield self._format_stream_chunk(
reasoning_content="Uploading video file...\n"
)
cameo_id = await self.sora_client.upload_character_video(video_bytes, token_obj.token)
debug_logger.log_info(f"Video uploaded, cameo_id: {cameo_id}")
# Step 2: Poll for character processing
yield self._format_stream_chunk(
reasoning_content="Processing video to extract character...\n"
)
cameo_status = await self._poll_cameo_status(cameo_id, token_obj.token)
debug_logger.log_info(f"Cameo status: {cameo_status}")
# Extract character info immediately after polling completes
username_hint = cameo_status.get("username_hint", "character")
display_name = cameo_status.get("display_name_hint", "Character")
# Process username: remove prefix and add 3 random digits
username = self._process_character_username(username_hint)
# Output character name immediately
yield self._format_stream_chunk(
reasoning_content=f"✨ 角色已识别: {display_name} (@{username})\n"
)
# Step 3: Download and cache avatar
yield self._format_stream_chunk(
reasoning_content="Downloading character avatar...\n"
)
profile_asset_url = cameo_status.get("profile_asset_url")
if not profile_asset_url:
raise Exception("Profile asset URL not found in cameo status")
avatar_data = await self.sora_client.download_character_image(profile_asset_url)
debug_logger.log_info(f"Avatar downloaded, size: {len(avatar_data)} bytes")
# Step 4: Upload avatar
yield self._format_stream_chunk(
reasoning_content="Uploading character avatar...\n"
)
asset_pointer = await self.sora_client.upload_character_image(avatar_data, token_obj.token)
debug_logger.log_info(f"Avatar uploaded, asset_pointer: {asset_pointer}")
# Step 5: Finalize character
yield self._format_stream_chunk(
reasoning_content="Finalizing character creation...\n"
)
# instruction_set_hint is a string, but instruction_set in cameo_status might be an array
instruction_set = cameo_status.get("instruction_set_hint") or cameo_status.get("instruction_set")
character_id = await self.sora_client.finalize_character(
cameo_id=cameo_id,
username=username,
display_name=display_name,
profile_asset_pointer=asset_pointer,
instruction_set=instruction_set,
token=token_obj.token
)
debug_logger.log_info(f"Character finalized, character_id: {character_id}")
# Step 6: Generate video with character
yield self._format_stream_chunk(
reasoning_content="**Video Generation Process Begins**\n\nGenerating video with character...\n"
)
# Prepend @username to prompt
full_prompt = f"@{username} {prompt}"
debug_logger.log_info(f"Full prompt: {full_prompt}")
# Get n_frames from model configuration
n_frames = model_config.get("n_frames", 300) # Default to 300 frames (10s)
task_id = await self.sora_client.generate_video(
full_prompt, token_obj.token,
orientation=model_config["orientation"],
n_frames=n_frames
)
debug_logger.log_info(f"Video generation started, task_id: {task_id}")
# Save task to database
task = Task(
task_id=task_id,
token_id=token_obj.id,
model=f"sora-video-{model_config['orientation']}",
prompt=full_prompt,
status="processing",
progress=0.0
)
await self.db.create_task(task)
# Record usage
await self.token_manager.record_usage(token_obj.id, is_video=True)
# Poll for results
async for chunk in self._poll_task_result(task_id, token_obj.token, True, True, full_prompt, token_obj.id):
yield chunk
# Record success
await self.token_manager.record_success(token_obj.id, is_video=True)
except Exception as e:
# Record error
if token_obj:
await self.token_manager.record_error(token_obj.id)
debug_logger.log_error(
error_message=f"Character and video generation failed: {str(e)}",
status_code=500,
response_text=str(e)
)
raise
finally:
# Step 7: Delete character
if character_id:
try:
yield self._format_stream_chunk(
reasoning_content="Cleaning up temporary character...\n"
)
await self.sora_client.delete_character(character_id, token_obj.token)
debug_logger.log_info(f"Character deleted: {character_id}")
except Exception as e:
debug_logger.log_error(
error_message=f"Failed to delete character: {str(e)}",
status_code=500,
response_text=str(e)
)
async def _handle_remix(self, remix_target_id: str, prompt: str, model_config: Dict) -> AsyncGenerator[str, None]:
"""Handle remix video generation
Flow:
1. Select token
2. Clean remix link from prompt
3. Call remix API
4. Poll for results
5. Return video result
"""
token_obj = await self.load_balancer.select_token(for_video_generation=True)
if not token_obj:
raise Exception("No available tokens for remix generation")
task_id = None
try:
yield self._format_stream_chunk(
reasoning_content="**Remix Generation Process Begins**\n\nInitializing remix request...\n",
is_first=True
)
# Clean remix link from prompt to avoid duplication
clean_prompt = self._clean_remix_link_from_prompt(prompt)
# Get n_frames from model configuration
n_frames = model_config.get("n_frames", 300) # Default to 300 frames (10s)
# Call remix API
yield self._format_stream_chunk(
reasoning_content="Sending remix request to server...\n"
)
task_id = await self.sora_client.remix_video(
remix_target_id=remix_target_id,
prompt=clean_prompt,
token=token_obj.token,
orientation=model_config["orientation"],
n_frames=n_frames
)
debug_logger.log_info(f"Remix generation started, task_id: {task_id}")
# Save task to database
task = Task(
task_id=task_id,
token_id=token_obj.id,
model=f"sora-video-{model_config['orientation']}",
prompt=f"remix:{remix_target_id} {clean_prompt}",
status="processing",
progress=0.0
)
await self.db.create_task(task)
# Record usage
await self.token_manager.record_usage(token_obj.id, is_video=True)
# Poll for results
async for chunk in self._poll_task_result(task_id, token_obj.token, True, True, clean_prompt, token_obj.id):
yield chunk
# Record success
await self.token_manager.record_success(token_obj.id, is_video=True)
except Exception as e:
# Record error
if token_obj:
await self.token_manager.record_error(token_obj.id)
debug_logger.log_error(
error_message=f"Remix generation failed: {str(e)}",
status_code=500,
response_text=str(e)
)
raise
async def _poll_cameo_status(self, cameo_id: str, token: str, timeout: int = 600, poll_interval: int = 5) -> Dict[str, Any]:
"""Poll for cameo (character) processing status
Args:
cameo_id: The cameo ID
token: Access token
timeout: Maximum time to wait in seconds
poll_interval: Time between polls in seconds
Returns:
Cameo status dictionary with display_name_hint, username_hint, profile_asset_url, instruction_set_hint
"""
start_time = time.time()
max_attempts = int(timeout / poll_interval)
consecutive_errors = 0
max_consecutive_errors = 3 # Allow up to 3 consecutive errors before failing
for attempt in range(max_attempts):
elapsed_time = time.time() - start_time
if elapsed_time > timeout:
raise Exception(f"Cameo processing timeout after {elapsed_time:.1f} seconds")
await asyncio.sleep(poll_interval)
try:
status = await self.sora_client.get_cameo_status(cameo_id, token)
current_status = status.get("status")
status_message = status.get("status_message", "")
# Reset error counter on successful request
consecutive_errors = 0
debug_logger.log_info(f"Cameo status: {current_status} (message: {status_message}) (attempt {attempt + 1}/{max_attempts})")
# Check if processing is complete
# Primary condition: status_message == "Completed" means processing is done
if status_message == "Completed":
debug_logger.log_info(f"Cameo processing completed (status: {current_status}, message: {status_message})")
return status
# Fallback condition: finalized status
if current_status == "finalized":
debug_logger.log_info(f"Cameo processing completed (status: {current_status}, message: {status_message})")
return status
except Exception as e:
consecutive_errors += 1
error_msg = str(e)
# Log error with context
debug_logger.log_error(
error_message=f"Failed to get cameo status (attempt {attempt + 1}/{max_attempts}, consecutive errors: {consecutive_errors}): {error_msg}",
status_code=500,
response_text=error_msg
)
# Check if it's a TLS/connection error
is_tls_error = "TLS" in error_msg or "curl" in error_msg or "OPENSSL" in error_msg
if is_tls_error:
# For TLS errors, use exponential backoff
backoff_time = min(poll_interval * (2 ** (consecutive_errors - 1)), 30)
debug_logger.log_info(f"TLS error detected, using exponential backoff: {backoff_time}s")
await asyncio.sleep(backoff_time)
# Fail if too many consecutive errors
if consecutive_errors >= max_consecutive_errors:
raise Exception(f"Too many consecutive errors ({consecutive_errors}) while polling cameo status: {error_msg}")
# Continue polling on error
continue
raise Exception(f"Cameo processing timeout after {timeout} seconds")