feat: 添加资源缓存开关、自动刷新AT fix: 修复邀请码复制、修复移动端无法滑动、优化图片、视频格式输出

This commit is contained in:
TheSmallHanCat
2025-11-09 18:26:37 +08:00
parent 7269e3fa79
commit b6cedb0ece
11 changed files with 382 additions and 113 deletions

View File

@@ -871,3 +871,77 @@ class TokenManager:
print(f"Failed to refresh Sora2 remaining count: {e}")
except Exception as e:
print(f"Error in refresh_sora2_remaining_if_cooldown_expired: {e}")
async def auto_refresh_expiring_token(self, token_id: int) -> bool:
"""
Auto refresh token when expiry time is within 24 hours using ST or RT
Returns:
True if refresh successful, False otherwise
"""
try:
token_data = await self.db.get_token(token_id)
if not token_data:
return False
# Check if token is expiring within 24 hours
if not token_data.expiry_time:
return False # No expiry time set
time_until_expiry = token_data.expiry_time - datetime.now()
hours_until_expiry = time_until_expiry.total_seconds() / 3600
# Only refresh if expiry is within 24 hours (1440 minutes)
if hours_until_expiry > 24:
return False # Token not expiring soon
if hours_until_expiry < 0:
# Token already expired, still try to refresh
print(f"🔄 Token {token_id} 已过期,尝试自动刷新...")
else:
print(f"🔄 Token {token_id} 将在 {hours_until_expiry:.1f} 小时后过期,尝试自动刷新...")
# Priority: ST > RT
new_at = None
new_st = None
new_rt = None
if token_data.st:
# Try to refresh using ST
try:
print(f"📝 使用 ST 刷新 Token {token_id}...")
result = await self.st_to_at(token_data.st)
new_at = result.get("access_token")
# ST refresh doesn't return new ST, so keep the old one
new_st = token_data.st
print(f"✅ 使用 ST 刷新成功")
except Exception as e:
print(f"❌ 使用 ST 刷新失败: {e}")
new_at = None
if not new_at and token_data.rt:
# Try to refresh using RT
try:
print(f"📝 使用 RT 刷新 Token {token_id}...")
result = await self.rt_to_at(token_data.rt)
new_at = result.get("access_token")
new_rt = result.get("refresh_token", token_data.rt) # RT might be updated
print(f"✅ 使用 RT 刷新成功")
except Exception as e:
print(f"❌ 使用 RT 刷新失败: {e}")
new_at = None
if new_at:
# Update token with new AT
await self.update_token(token_id, token=new_at, st=new_st, rt=new_rt)
print(f"✅ Token {token_id} 已自动刷新")
return True
else:
# No ST or RT, disable token
print(f"⚠️ Token {token_id} 无法刷新(无 ST 或 RT已禁用")
await self.disable_token(token_id)
return False
except Exception as e:
print(f"❌ 自动刷新 Token {token_id} 失败: {e}")
return False