From fb0569c2986a0e4a67b0bca130ec02a4bc1633f3 Mon Sep 17 00:00:00 2001 From: TheSmallHanCat Date: Sun, 11 Jan 2026 13:04:19 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96CF429=E9=94=99?= =?UTF-8?q?=E8=AF=AF=E5=A4=84=E7=90=86=E5=8F=8A=E8=B6=85=E6=97=B6=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/services/generation_handler.py | 168 +++++++++++++++++++++++++---- 1 file changed, 150 insertions(+), 18 deletions(-) diff --git a/src/services/generation_handler.py b/src/services/generation_handler.py index 81c94d8..916a24f 100644 --- a/src/services/generation_handler.py +++ b/src/services/generation_handler.py @@ -533,7 +533,7 @@ class GenerationHandler: await self.token_manager.record_usage(token_obj.id, is_video=is_video) # Poll for results with timeout - async for chunk in self._poll_task_result(task_id, token_obj.token, is_video, stream, prompt, token_obj.id): + async for chunk in self._poll_task_result(task_id, token_obj.token, is_video, stream, prompt, token_obj.id, log_id, start_time): yield chunk # Record success @@ -591,12 +591,6 @@ class GenerationHandler: if is_video and token_obj and self.concurrency_manager: await self.concurrency_manager.release_video(token_obj.id) - # Record error (check if it's an overload error) - if token_obj: - error_str = str(e).lower() - is_overload = "heavy_load" in error_str or "under heavy load" in error_str - await self.token_manager.record_error(token_obj.id, is_overload=is_overload) - # Parse error message to check if it's a structured error (JSON) error_response = None try: @@ -604,15 +598,31 @@ class GenerationHandler: except: pass + # Check for CF shield/429 error + is_cf_or_429 = False + if error_response and isinstance(error_response, dict): + error_info = error_response.get("error", {}) + if error_info.get("code") == "cf_shield_429": + is_cf_or_429 = True + + # Record error (check if it's an overload error or CF/429 error) + if token_obj: + error_str = str(e).lower() + is_overload = "heavy_load" in error_str or "under heavy load" in error_str + # Don't record error for CF shield/429 (not token's fault) + if not is_cf_or_429: + await self.token_manager.record_error(token_obj.id, is_overload=is_overload) + # Update log entry with error data duration = time.time() - start_time if log_id: if error_response: - # Structured error (e.g., unsupported_country_code) + # Structured error (e.g., unsupported_country_code, cf_shield_429) + status_code = 429 if is_cf_or_429 else 400 await self.db.update_request_log( log_id, response_body=json.dumps(error_response), - status_code=400, + status_code=status_code, duration=duration ) else: @@ -626,7 +636,8 @@ class GenerationHandler: raise e async def _poll_task_result(self, task_id: str, token: str, is_video: bool, - stream: bool, prompt: str, token_id: int = None) -> AsyncGenerator[str, None]: + stream: bool, prompt: str, token_id: int = None, + log_id: int = None, start_time: float = None) -> AsyncGenerator[str, None]: """Poll for task result with timeout""" # Get timeout from config timeout = config.video_timeout if is_video else config.image_timeout @@ -669,7 +680,19 @@ class GenerationHandler: await self.concurrency_manager.release_video(token_id) debug_logger.log_info(f"Released concurrency slot for token {token_id} due to timeout") + # Update task status to failed await self.db.update_task(task_id, "failed", 0, error_message=f"Generation timeout after {elapsed_time:.1f} seconds") + + # Update request log with timeout error + if log_id and start_time: + duration = time.time() - start_time + await self.db.update_request_log( + log_id, + response_body=json.dumps({"error": f"Generation timeout after {elapsed_time:.1f} seconds"}), + status_code=408, + duration=duration + ) + raise Exception(f"Upstream API timeout: Generation exceeded {timeout} seconds limit") @@ -1057,6 +1080,61 @@ class GenerationHandler: ) except Exception as e: + # Check for CF shield/429 error - don't retry these + error_str = str(e) + is_cf_or_429 = False + try: + error_response = json.loads(error_str) + if isinstance(error_response, dict): + error_info = error_response.get("error", {}) + if error_info.get("code") == "cf_shield_429": + is_cf_or_429 = True + except (json.JSONDecodeError, ValueError): + pass + + # CF shield/429 detected - fail immediately + if is_cf_or_429: + debug_logger.log_error( + error_message="CF Shield/429 detected during polling, failing task immediately", + status_code=429, + response_text=error_str + ) + # Update task status to failed + await self.db.update_task(task_id, "failed", 0, error_message="Cloudflare challenge or rate limit (429) triggered") + + # Update request log with CF/429 error + if log_id and start_time: + duration = time.time() - start_time + await self.db.update_request_log( + log_id, + response_body=json.dumps({"error": "Cloudflare challenge or rate limit (429) triggered"}), + status_code=429, + duration=duration + ) + + # Release resources + if not is_video and token_id: + await self.load_balancer.token_lock.release_lock(token_id) + if self.concurrency_manager: + await self.concurrency_manager.release_image(token_id) + if is_video and token_id and self.concurrency_manager: + await self.concurrency_manager.release_video(token_id) + + # Send error message to client if streaming + if stream: + yield self._format_stream_chunk( + reasoning_content="**CF Shield/429 Error**\\n\\nCloudflare challenge or rate limit (429) triggered\\n" + ) + yield self._format_stream_chunk( + content="❌ Generation failed: Cloudflare challenge or rate limit (429) triggered. Please change proxy or reduce request frequency.", + finish_reason="STOP" + ) + yield "data: [DONE]\\n\\n" + + # Exit polling immediately + return + + # For other errors, retry if not last attempt if attempt >= max_attempts - 1: raise e continue @@ -1315,6 +1393,20 @@ class GenerationHandler: yield "data: [DONE]\n\n" except Exception as e: + # Parse error to check for CF shield/429 + error_response = None + try: + error_response = json.loads(str(e)) + except: + pass + + # Check for CF shield/429 error + is_cf_or_429 = False + if error_response and isinstance(error_response, dict): + error_info = error_response.get("error", {}) + if error_info.get("code") == "cf_shield_429": + is_cf_or_429 = True + # Log failed character creation duration = time.time() - start_time await self._log_request( @@ -1328,13 +1420,21 @@ class GenerationHandler: "success": False, "error": str(e) }, - status_code=500, + status_code=429 if is_cf_or_429 else 500, duration=duration ) + # Record error (check if it's an overload error or CF/429 error) + if token_obj: + error_str = str(e).lower() + is_overload = "heavy_load" in error_str or "under heavy load" in error_str + # Don't record error for CF shield/429 (not token's fault) + if not is_cf_or_429: + await self.token_manager.record_error(token_obj.id, is_overload=is_overload) + debug_logger.log_error( error_message=f"Character creation failed: {str(e)}", - status_code=500, + status_code=429 if is_cf_or_429 else 500, response_text=str(e) ) raise @@ -1531,14 +1631,30 @@ class GenerationHandler: duration=duration ) - # Record error (check if it's an overload error) + # Parse error to check for CF shield/429 + error_response = None + try: + error_response = json.loads(str(e)) + except: + pass + + # Check for CF shield/429 error + is_cf_or_429 = False + if error_response and isinstance(error_response, dict): + error_info = error_response.get("error", {}) + if error_info.get("code") == "cf_shield_429": + is_cf_or_429 = True + + # Record error (check if it's an overload error or CF/429 error) if token_obj: error_str = str(e).lower() is_overload = "heavy_load" in error_str or "under heavy load" in error_str - await self.token_manager.record_error(token_obj.id, is_overload=is_overload) + # Don't record error for CF shield/429 (not token's fault) + if not is_cf_or_429: + await self.token_manager.record_error(token_obj.id, is_overload=is_overload) debug_logger.log_error( error_message=f"Character and video generation failed: {str(e)}", - status_code=500, + status_code=429 if is_cf_or_429 else 500, response_text=str(e) ) raise @@ -1624,14 +1740,30 @@ class GenerationHandler: await self.token_manager.record_success(token_obj.id, is_video=True) except Exception as e: - # Record error (check if it's an overload error) + # Parse error to check for CF shield/429 + error_response = None + try: + error_response = json.loads(str(e)) + except: + pass + + # Check for CF shield/429 error + is_cf_or_429 = False + if error_response and isinstance(error_response, dict): + error_info = error_response.get("error", {}) + if error_info.get("code") == "cf_shield_429": + is_cf_or_429 = True + + # Record error (check if it's an overload error or CF/429 error) if token_obj: error_str = str(e).lower() is_overload = "heavy_load" in error_str or "under heavy load" in error_str - await self.token_manager.record_error(token_obj.id, is_overload=is_overload) + # Don't record error for CF shield/429 (not token's fault) + if not is_cf_or_429: + await self.token_manager.record_error(token_obj.id, is_overload=is_overload) debug_logger.log_error( error_message=f"Remix generation failed: {str(e)}", - status_code=500, + status_code=429 if is_cf_or_429 else 500, response_text=str(e) ) raise