fix: circuit breaker deadlock — always query sidecar for status
The circuit breaker opened after MAX_RECOVERY_ATTEMPTS failures but was never reset because the sidecar status query (which calls circuit_reset()) was skipped when the circuit was open. This caused a permanent deadlock: all subsequent requests went to the LXC fallback with no recovery possible. Fix: always query the sidecar for /models/status, even when the circuit is open. If the sidecar responds successfully, reset the circuit. The circuit breaker now only prevents the SWITCH operation, not the status health check. If a model is already running when the circuit is open, route to it directly.
This commit is contained in:
parent
bcf45129f1
commit
7e9b3f43e1
153
main.py
153
main.py
@ -403,84 +403,85 @@ async def proxy(
|
|||||||
error: Optional[str] = None
|
error: Optional[str] = None
|
||||||
sidecar_status = None
|
sidecar_status = None
|
||||||
|
|
||||||
# Circuit breaker check
|
# Always query the sidecar first (to detect recovery even when circuit is open)
|
||||||
if not await circuit_breaker_check():
|
async with httpx.AsyncClient(timeout=3.0) as client:
|
||||||
|
try:
|
||||||
|
resp = await client.get(f"{SIDECAR_URL}/models/status")
|
||||||
|
if resp.status_code == 200:
|
||||||
|
sidecar_status = resp.json()
|
||||||
|
circuit_reset()
|
||||||
|
except Exception:
|
||||||
|
pass # Handled below
|
||||||
|
|
||||||
|
if sidecar_status is None:
|
||||||
|
circuit_record_failure()
|
||||||
|
error = "sidecar_down"
|
||||||
|
elif not await circuit_breaker_check():
|
||||||
|
# Sidecar is up but circuit is open from prior switch failures
|
||||||
|
# Only block the switch — allow routing to already-active backend
|
||||||
error = "circuit_open"
|
error = "circuit_open"
|
||||||
|
if sidecar_status.get("llama_server_running"):
|
||||||
|
target_url = f"{MAIN_PC_BASE}/{path}"
|
||||||
else:
|
else:
|
||||||
# Query Sidecar for active model
|
# Both sidecar reachable and circuit closed — proceed normally
|
||||||
sidecar_status = None
|
body = await request.body()
|
||||||
async with httpx.AsyncClient(timeout=3.0) as client:
|
body_data = json.loads(body) if body else {}
|
||||||
try:
|
requested_model = body_data.get("model")
|
||||||
resp = await client.get(f"{SIDECAR_URL}/models/status")
|
|
||||||
if resp.status_code == 200:
|
|
||||||
sidecar_status = resp.json()
|
|
||||||
circuit_reset()
|
|
||||||
except Exception:
|
|
||||||
error = "sidecar_down"
|
|
||||||
|
|
||||||
if sidecar_status is None:
|
if requested_model and sidecar_status.get("active_profile") == requested_model:
|
||||||
circuit_record_failure()
|
target_url = f"{MAIN_PC_BASE}/{path}"
|
||||||
error = "sidecar_down"
|
elif requested_model:
|
||||||
else:
|
# Trigger switch for a specific model request
|
||||||
# Extract requested model from request body
|
# Check if a switch is already in progress
|
||||||
body = await request.body()
|
current_switch = await wait_for_switch()
|
||||||
body_data = json.loads(body) if body else {}
|
|
||||||
requested_model = body_data.get("model")
|
|
||||||
|
|
||||||
if requested_model and sidecar_status.get("active_profile") == requested_model:
|
if current_switch is not None and not current_switch.is_set():
|
||||||
target_url = f"{MAIN_PC_BASE}/{path}"
|
# Another request started the switch — queue this one
|
||||||
elif requested_model:
|
|
||||||
# Trigger switch for a specific model request
|
|
||||||
# Check if a switch is already in progress
|
|
||||||
current_switch = await wait_for_switch()
|
|
||||||
|
|
||||||
if current_switch is not None and not current_switch.is_set():
|
|
||||||
# Another request started the switch — queue this one
|
|
||||||
try:
|
|
||||||
wait_evt = await queue_request()
|
|
||||||
except HTTPException as he:
|
|
||||||
raise
|
|
||||||
|
|
||||||
# SSE progress while waiting
|
|
||||||
async def stream_with_sse():
|
|
||||||
sse_gen = sse_progress_stream(wait_evt)
|
|
||||||
try:
|
|
||||||
await wait_evt.wait()
|
|
||||||
async for sse_chunk in sse_gen:
|
|
||||||
yield sse_chunk
|
|
||||||
complete_switch()
|
|
||||||
drain_queue()
|
|
||||||
async with httpx.AsyncClient(timeout=60.0) as c:
|
|
||||||
req_headers = dict(request.headers)
|
|
||||||
req_headers.pop("host", None)
|
|
||||||
async with c.stream(
|
|
||||||
request.method,
|
|
||||||
f"{MAIN_PC_BASE}/{path}",
|
|
||||||
content=body,
|
|
||||||
headers=req_headers,
|
|
||||||
) as resp:
|
|
||||||
async for chunk in resp.aiter_bytes():
|
|
||||||
yield chunk
|
|
||||||
finally:
|
|
||||||
# Clean up sse_gen
|
|
||||||
try:
|
|
||||||
await sse_gen.aclose()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return StreamingResponse(
|
|
||||||
stream_with_sse(),
|
|
||||||
media_type="text/event-stream",
|
|
||||||
)
|
|
||||||
|
|
||||||
# First request triggers the switch
|
|
||||||
await start_switch() # Create event for tracking
|
|
||||||
try:
|
try:
|
||||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
wait_evt = await queue_request()
|
||||||
switch_resp = await client.post(
|
except HTTPException as he:
|
||||||
f"{SIDECAR_URL}/models/switch",
|
raise
|
||||||
json={"profile_id": requested_model},
|
|
||||||
)
|
# SSE progress while waiting
|
||||||
|
async def stream_with_sse():
|
||||||
|
sse_gen = sse_progress_stream(wait_evt)
|
||||||
|
try:
|
||||||
|
await wait_evt.wait()
|
||||||
|
async for sse_chunk in sse_gen:
|
||||||
|
yield sse_chunk
|
||||||
|
complete_switch()
|
||||||
|
drain_queue()
|
||||||
|
async with httpx.AsyncClient(timeout=60.0) as c:
|
||||||
|
req_headers = dict(request.headers)
|
||||||
|
req_headers.pop("host", None)
|
||||||
|
async with c.stream(
|
||||||
|
request.method,
|
||||||
|
f"{MAIN_PC_BASE}/{path}",
|
||||||
|
content=body,
|
||||||
|
headers=req_headers,
|
||||||
|
) as resp:
|
||||||
|
async for chunk in resp.aiter_bytes():
|
||||||
|
yield chunk
|
||||||
|
finally:
|
||||||
|
# Clean up sse_gen
|
||||||
|
try:
|
||||||
|
await sse_gen.aclose()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
stream_with_sse(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
)
|
||||||
|
|
||||||
|
# First request triggers the switch
|
||||||
|
await start_switch() # Create event for tracking
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||||
|
switch_resp = await client.post(
|
||||||
|
f"{SIDECAR_URL}/models/switch",
|
||||||
|
json={"profile_id": requested_model},
|
||||||
|
)
|
||||||
switch_result = switch_resp.json()
|
switch_result = switch_resp.json()
|
||||||
if switch_result.get("status") == "ready":
|
if switch_result.get("status") == "ready":
|
||||||
complete_switch()
|
complete_switch()
|
||||||
@ -495,9 +496,9 @@ async def proxy(
|
|||||||
f"message={switch_result.get('message', '(no message)')}",
|
f"message={switch_result.get('message', '(no message)')}",
|
||||||
flush=True,
|
flush=True,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
circuit_record_failure()
|
circuit_record_failure()
|
||||||
error = f"switch_error: {str(e)}"
|
error = f"switch_error: {str(e)}"
|
||||||
else:
|
else:
|
||||||
# No model in request body (probe/GET/non-chat request) —
|
# No model in request body (probe/GET/non-chat request) —
|
||||||
# route to the currently active backend when available,
|
# route to the currently active backend when available,
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user