From 7e9b3f43e17d216a0a5b73e085f599440d8c73cc Mon Sep 17 00:00:00 2001 From: root Date: Tue, 16 Jun 2026 22:09:16 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20circuit=20breaker=20deadlock=20=E2=80=94?= =?UTF-8?q?=20always=20query=20sidecar=20for=20status?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The circuit breaker opened after MAX_RECOVERY_ATTEMPTS failures but was never reset because the sidecar status query (which calls circuit_reset()) was skipped when the circuit was open. This caused a permanent deadlock: all subsequent requests went to the LXC fallback with no recovery possible. Fix: always query the sidecar for /models/status, even when the circuit is open. If the sidecar responds successfully, reset the circuit. The circuit breaker now only prevents the SWITCH operation, not the status health check. If a model is already running when the circuit is open, route to it directly. --- main.py | 153 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 77 insertions(+), 76 deletions(-) diff --git a/main.py b/main.py index 60c3ad8..d79e21f 100644 --- a/main.py +++ b/main.py @@ -403,84 +403,85 @@ async def proxy( error: Optional[str] = None sidecar_status = None - # Circuit breaker check - if not await circuit_breaker_check(): + # Always query the sidecar first (to detect recovery even when circuit is open) + async with httpx.AsyncClient(timeout=3.0) as client: + try: + resp = await client.get(f"{SIDECAR_URL}/models/status") + if resp.status_code == 200: + sidecar_status = resp.json() + circuit_reset() + except Exception: + pass # Handled below + + if sidecar_status is None: + circuit_record_failure() + error = "sidecar_down" + elif not await circuit_breaker_check(): + # Sidecar is up but circuit is open from prior switch failures + # Only block the switch — allow routing to already-active backend error = "circuit_open" + if sidecar_status.get("llama_server_running"): + target_url = f"{MAIN_PC_BASE}/{path}" else: - # Query Sidecar for active model - sidecar_status = None - async with httpx.AsyncClient(timeout=3.0) as client: - try: - resp = await client.get(f"{SIDECAR_URL}/models/status") - if resp.status_code == 200: - sidecar_status = resp.json() - circuit_reset() - except Exception: - error = "sidecar_down" + # Both sidecar reachable and circuit closed — proceed normally + body = await request.body() + body_data = json.loads(body) if body else {} + requested_model = body_data.get("model") - if sidecar_status is None: - circuit_record_failure() - error = "sidecar_down" - else: - # Extract requested model from request body - body = await request.body() - body_data = json.loads(body) if body else {} - requested_model = body_data.get("model") + if requested_model and sidecar_status.get("active_profile") == requested_model: + target_url = f"{MAIN_PC_BASE}/{path}" + elif requested_model: + # Trigger switch for a specific model request + # Check if a switch is already in progress + current_switch = await wait_for_switch() - if requested_model and sidecar_status.get("active_profile") == requested_model: - target_url = f"{MAIN_PC_BASE}/{path}" - elif requested_model: - # Trigger switch for a specific model request - # Check if a switch is already in progress - current_switch = await wait_for_switch() - - if current_switch is not None and not current_switch.is_set(): - # Another request started the switch — queue this one - try: - wait_evt = await queue_request() - except HTTPException as he: - raise - - # SSE progress while waiting - async def stream_with_sse(): - sse_gen = sse_progress_stream(wait_evt) - try: - await wait_evt.wait() - async for sse_chunk in sse_gen: - yield sse_chunk - complete_switch() - drain_queue() - async with httpx.AsyncClient(timeout=60.0) as c: - req_headers = dict(request.headers) - req_headers.pop("host", None) - async with c.stream( - request.method, - f"{MAIN_PC_BASE}/{path}", - content=body, - headers=req_headers, - ) as resp: - async for chunk in resp.aiter_bytes(): - yield chunk - finally: - # Clean up sse_gen - try: - await sse_gen.aclose() - except Exception: - pass - - return StreamingResponse( - stream_with_sse(), - media_type="text/event-stream", - ) - - # First request triggers the switch - await start_switch() # Create event for tracking + if current_switch is not None and not current_switch.is_set(): + # Another request started the switch — queue this one try: - async with httpx.AsyncClient(timeout=120.0) as client: - switch_resp = await client.post( - f"{SIDECAR_URL}/models/switch", - json={"profile_id": requested_model}, - ) + wait_evt = await queue_request() + except HTTPException as he: + raise + + # SSE progress while waiting + async def stream_with_sse(): + sse_gen = sse_progress_stream(wait_evt) + try: + await wait_evt.wait() + async for sse_chunk in sse_gen: + yield sse_chunk + complete_switch() + drain_queue() + async with httpx.AsyncClient(timeout=60.0) as c: + req_headers = dict(request.headers) + req_headers.pop("host", None) + async with c.stream( + request.method, + f"{MAIN_PC_BASE}/{path}", + content=body, + headers=req_headers, + ) as resp: + async for chunk in resp.aiter_bytes(): + yield chunk + finally: + # Clean up sse_gen + try: + await sse_gen.aclose() + except Exception: + pass + + return StreamingResponse( + stream_with_sse(), + media_type="text/event-stream", + ) + + # First request triggers the switch + await start_switch() # Create event for tracking + try: + async with httpx.AsyncClient(timeout=120.0) as client: + switch_resp = await client.post( + f"{SIDECAR_URL}/models/switch", + json={"profile_id": requested_model}, + ) switch_result = switch_resp.json() if switch_result.get("status") == "ready": complete_switch() @@ -495,9 +496,9 @@ async def proxy( f"message={switch_result.get('message', '(no message)')}", flush=True, ) - except Exception as e: - circuit_record_failure() - error = f"switch_error: {str(e)}" + except Exception as e: + circuit_record_failure() + error = f"switch_error: {str(e)}" else: # No model in request body (probe/GET/non-chat request) — # route to the currently active backend when available,