The FIRST request that triggers a model switch was blocking the HTTP response for 10-30s while waiting for the sidecar to load the model. Hermes Desktop's client timed out during this wait, causing 'nothing happens' on new session. Fix: refactored the proxy handler so ALL requests during a model switch use the same SSE streaming pattern (immediate 200, progress events, then actual response piped through after switch completes). The switch now runs as a background asyncio task via create_task(). - Added _background_switch() — runs POST /models/switch in background task with complete_switch() + drain_queue() in finally block - All switch-triggering requests go through queue_request() + StreamingResponse - SSE generator now falls through to OpenRouter/LXC if Main PC unreachable (switch failure case) instead of hanging indefinitely Sidecar fixes from previous commit: - _kill_llama_server() is now async with proper await on process termination - _switch_lock changed from threading.Lock to asyncio.Lock()
703 lines
28 KiB
Python
703 lines
28 KiB
Python
import os
|
||
import asyncio
|
||
import json
|
||
import threading
|
||
from contextlib import asynccontextmanager
|
||
from typing import Optional
|
||
|
||
import httpx
|
||
from fastapi import FastAPI, Request, Response, Header, HTTPException
|
||
from fastapi.responses import StreamingResponse, JSONResponse
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
|
||
# ─── Configuration ───────────────────────────────────────────────────────────
|
||
SIDECAR_URL = os.getenv("SIDECAR_URL", "http://10.0.4.11:8080")
|
||
MAIN_PC_BASE = os.getenv("MAIN_PC_URL", "http://10.0.4.11:8080/v1").removesuffix("/v1")
|
||
FALLBACK_SLM_URL = os.getenv("FALLBACK_SLM_URL", "http://10.0.4.200:8080/v1").removesuffix("/v1")
|
||
OPENROUTER_API_KEY=os.getenv("OPENROUTER_API_KEY", "")
|
||
OPENROUTER_BASE = "https://openrouter.ai"
|
||
|
||
print(f"SIDECAR_URL={SIDECAR_URL}")
|
||
print(f"MAIN_PC_BASE={MAIN_PC_BASE}")
|
||
print(f"FALLBACK_SLM_URL={FALLBACK_SLM_URL}")
|
||
|
||
# ─── Request Queue ───────────────────────────────────────────────────────────
|
||
_MAX_QUEUE_SIZE = 10
|
||
_QUEUE_TIMEOUT = 120 # seconds
|
||
|
||
_queue_lock = asyncio.Lock()
|
||
_queue: list = []
|
||
|
||
|
||
async def queue_request() -> asyncio.Event:
|
||
"""Add a request to the queue. Raises 429 if full."""
|
||
global _queue
|
||
async with _queue_lock:
|
||
if len(_queue) >= _MAX_QUEUE_SIZE:
|
||
raise HTTPException(status_code=429, detail="Server is busy, too many queued requests")
|
||
event = asyncio.Event()
|
||
_queue.append(event)
|
||
|
||
try:
|
||
await asyncio.wait_for(event.wait(), timeout=_QUEUE_TIMEOUT)
|
||
return event
|
||
except asyncio.TimeoutError:
|
||
async with _queue_lock:
|
||
if event in _queue:
|
||
_queue.remove(event)
|
||
raise HTTPException(status_code=429, detail="Request timed out waiting for model switch")
|
||
|
||
|
||
def drain_queue():
|
||
"""Signal all queued requests that the model is ready."""
|
||
lock = threading.Lock()
|
||
with lock:
|
||
for event in _queue:
|
||
event.set()
|
||
_queue.clear()
|
||
|
||
|
||
# ─── Circuit Breaker ────────────────────────────────────────────────────────
|
||
MAX_RECOVERY_ATTEMPTS = 3
|
||
_recovery_attempts = 0
|
||
_circuit_open = False
|
||
_circuit_lock = asyncio.Lock()
|
||
|
||
|
||
async def circuit_breaker_check() -> bool:
|
||
"""Check if the circuit allows a Sidecar request."""
|
||
global _circuit_open
|
||
async with _circuit_lock:
|
||
return not _circuit_open
|
||
|
||
|
||
def circuit_reset():
|
||
"""Reset circuit breaker after a successful Sidecar interaction."""
|
||
global _circuit_open, _recovery_attempts
|
||
_circuit_open = False
|
||
_recovery_attempts = 0
|
||
|
||
|
||
def circuit_record_failure():
|
||
"""Record a Sidecar failure. Opens circuit after MAX_RECOVERY_ATTEMPTS."""
|
||
global _circuit_open, _recovery_attempts
|
||
_recovery_attempts += 1
|
||
if _recovery_attempts >= MAX_RECOVERY_ATTEMPTS:
|
||
_circuit_open = True
|
||
print(f"Circuit breaker OPENED after {_recovery_attempts} failures")
|
||
|
||
|
||
# ─── SSE Helpers ─────────────────────────────────────────────────────────────
|
||
def _sse_format(event: str, data: dict) -> str:
|
||
lines = [f"event: {event}"]
|
||
lines.append(f"data: {json.dumps(data)}")
|
||
lines.append("")
|
||
lines.append("")
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ─── Router State ───────────────────────────────────────────────────────────
|
||
_switching_event: Optional[asyncio.Event] = None
|
||
_switching_lock = threading.Lock()
|
||
|
||
|
||
async def start_switch():
|
||
"""Signal that a switch has started. Creates an unset event to track the switch."""
|
||
global _switching_event
|
||
with _switching_lock:
|
||
if _switching_event is None or _switching_event.is_set():
|
||
_switching_event = asyncio.Event()
|
||
|
||
|
||
async def wait_for_switch():
|
||
"""Wait for the current switch to complete. Returns None if no active switch.
|
||
|
||
Returns None immediately if no switch is in progress (event is None or set).
|
||
If a switch IS in progress, waits for it to complete and then clears the event.
|
||
"""
|
||
global _switching_event
|
||
with _switching_lock:
|
||
if _switching_event is None or _switching_event.is_set():
|
||
# No switch happening, or already done
|
||
return None
|
||
evt = _switching_event
|
||
|
||
# A switch IS in progress — wait for it
|
||
await evt.wait()
|
||
|
||
# Switch is done — clear for next time
|
||
with _switching_lock:
|
||
if _switching_event is not None and _switching_event.is_set():
|
||
_switching_event = None
|
||
|
||
|
||
def complete_switch():
|
||
"""Mark the current switch as complete. Signals waiting requests."""
|
||
global _switching_event
|
||
with _switching_lock:
|
||
if _switching_event is not None and not _switching_event.is_set():
|
||
_switching_event.set()
|
||
|
||
|
||
async def _background_switch(requested_model: str):
|
||
"""Run a model switch in the background.
|
||
|
||
The sidecar POST is awaited but the caller gets an immediate SSE stream
|
||
so Hermes Desktop doesn't timeout waiting for the first response.
|
||
|
||
Called via asyncio.create_task() so it runs concurrently with the
|
||
SSE stream being sent to the client.
|
||
"""
|
||
try:
|
||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||
switch_resp = await client.post(
|
||
f"{SIDECAR_URL}/models/switch",
|
||
json={"profile_id": requested_model},
|
||
)
|
||
switch_result = switch_resp.json()
|
||
if switch_result.get("status") == "ready":
|
||
print(
|
||
f"SWITCH SUCCESS: profile={requested_model}",
|
||
flush=True,
|
||
)
|
||
else:
|
||
circuit_record_failure()
|
||
print(
|
||
f"SWITCH FAILED: profile={requested_model}, "
|
||
f"status={switch_result.get('status')}, "
|
||
f"message={switch_result.get('message', '(no message)')}",
|
||
flush=True,
|
||
)
|
||
except Exception as e:
|
||
circuit_record_failure()
|
||
print(
|
||
f"SWITCH EXCEPTION: profile={requested_model}, "
|
||
f"error={type(e).__name__}: {e}",
|
||
flush=True,
|
||
)
|
||
finally:
|
||
# Signal all queued requests so they can proceed (and fall
|
||
# through to the fallback chain if the switch failed).
|
||
complete_switch()
|
||
drain_queue()
|
||
|
||
|
||
# ─── App ─────────────────────────────────────────────────────────────────────
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
print("Intelligence Router starting")
|
||
yield
|
||
print("Intelligence Router shutting down")
|
||
|
||
|
||
app = FastAPI(lifespan=lifespan)
|
||
|
||
|
||
# ─── GET /v1/models — Issue #2 ──────────────────────────────────────────────
|
||
@app.get("/v1")
|
||
async def v1_root():
|
||
"""OpenAI API root — return basic info for Hermes Desktop WebUI probe."""
|
||
return {"object": "list", "data": []}
|
||
|
||
|
||
@app.get("/v1/models")
|
||
async def get_models():
|
||
"""OpenAI-compatible /v1/models endpoint proxying to Sidecar."""
|
||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||
try:
|
||
resp = await client.get(f"{SIDECAR_URL}/models/available")
|
||
profiles = resp.json()
|
||
except Exception:
|
||
return JSONResponse(
|
||
status_code=503,
|
||
content={"error": "Sidecar unavailable", "data": []},
|
||
)
|
||
|
||
models_data = [
|
||
{"id": p["id"], "object": "model", "owned_by": "sidecar"}
|
||
for p in profiles
|
||
]
|
||
return {"object": "list", "data": models_data}
|
||
|
||
|
||
# ─── GET /health ─────────────────────────────────────────────────────────────
|
||
@app.get("/health")
|
||
async def health():
|
||
return {"status": "router_online"}
|
||
|
||
|
||
# ─── Hermes Desktop Probe Endpoints ──────────────────────────────────────────
|
||
# These endpoints are probed by Hermes Desktop to validate/identify the
|
||
# provider before allowing model switching. Without them the desktop
|
||
# returns 503 and refuses to switch models.
|
||
|
||
@app.get("/v1/models/{model_id:path}")
|
||
async def get_single_model(model_id: str):
|
||
"""OpenAI-compatible single model query. Proxied via Sidecar model list."""
|
||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||
try:
|
||
resp = await client.get(f"{SIDECAR_URL}/models/available")
|
||
profiles = resp.json()
|
||
except Exception:
|
||
return JSONResponse(
|
||
status_code=503,
|
||
content={"error": "Sidecar unavailable", "data": []},
|
||
)
|
||
|
||
for p in profiles:
|
||
if p.get("id") == model_id:
|
||
return {"id": p["id"], "object": "model", "owned_by": "sidecar"}
|
||
return JSONResponse(status_code=404, content={"error": "model not found", "id": model_id})
|
||
|
||
|
||
@app.get("/api/tags")
|
||
async def ollama_tags():
|
||
"""Ollama-compatible model list for Hermes Desktop discovery."""
|
||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||
try:
|
||
resp = await client.get(f"{SIDECAR_URL}/models/available")
|
||
profiles = resp.json()
|
||
except Exception:
|
||
return JSONResponse(content={"models": []})
|
||
|
||
models = []
|
||
for p in profiles:
|
||
models.append({
|
||
"name": p.get("id", ""),
|
||
"model": p.get("id", ""),
|
||
"modified_at": "2025-01-01T00:00:00Z",
|
||
"size": 0,
|
||
"digest": "",
|
||
"details": {"format": "gguf", "family": p.get("name", "llm")},
|
||
})
|
||
return {"models": models}
|
||
|
||
|
||
@app.get("/api/show")
|
||
async def ollama_show_get(model: str = ""):
|
||
"""Ollama-compatible model info for Hermes Desktop discovery (GET variant).
|
||
|
||
Some Hermes Desktop versions probe /api/show via GET with a ?model= parameter.
|
||
"""
|
||
return await _ollama_show_lookup(model)
|
||
|
||
|
||
@app.post("/api/show")
|
||
async def ollama_show_post(request: Request):
|
||
"""Ollama-compatible model info for Hermes Desktop discovery (POST variant)."""
|
||
body = await request.body()
|
||
body_data = json.loads(body) if body else {}
|
||
model_name = body_data.get("model", "")
|
||
return await _ollama_show_lookup(model_name)
|
||
|
||
|
||
async def _ollama_show_lookup(model_name: str):
|
||
"""Shared logic for Ollama /api/show model info lookup.
|
||
|
||
When model_name is empty string (Hermes Desktop probe with no model field),
|
||
returns the currently-active profile's info so the desktop can determine
|
||
the correct context size. Previously returned 404, causing Hermes Desktop
|
||
to default to 256k context.
|
||
"""
|
||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||
try:
|
||
resp = await client.get(f"{SIDECAR_URL}/models/available")
|
||
profiles = resp.json()
|
||
status_resp = await client.get(f"{SIDECAR_URL}/models/status")
|
||
status = status_resp.json()
|
||
except Exception:
|
||
return JSONResponse(status_code=404, content={"error": "model not found"})
|
||
|
||
# If no model specified, return the currently-active profile's info
|
||
active_id = status.get("active_profile")
|
||
if not model_name and active_id:
|
||
for p in profiles:
|
||
if p.get("id") == active_id:
|
||
flags = p.get("flags", {})
|
||
ctx_size = str(flags.get("ctx-size", flags.get("n_ctx", "4096")))
|
||
return {
|
||
"modelfile": "",
|
||
"parameters": f"num_ctx {ctx_size}",
|
||
"template": "",
|
||
"details": {
|
||
"format": "gguf",
|
||
"family": p.get("name", "llm"),
|
||
"parameter_size": ctx_size,
|
||
},
|
||
"model_info": {"id": p.get("id", "")},
|
||
}
|
||
|
||
for p in profiles:
|
||
if p.get("id") == model_name:
|
||
# Extract actual context size from the profile's flags
|
||
flags = p.get("flags", {})
|
||
ctx_size = str(flags.get("ctx-size", flags.get("n_ctx", "4096")))
|
||
return {
|
||
"modelfile": "",
|
||
"parameters": f"num_ctx {ctx_size}",
|
||
"template": "",
|
||
"details": {
|
||
"format": "gguf",
|
||
"family": p.get("name", "llm"),
|
||
"parameter_size": ctx_size,
|
||
},
|
||
"model_info": {"id": p.get("id", "")},
|
||
}
|
||
return JSONResponse(status_code=404, content={"error": "model not found"})
|
||
|
||
|
||
@app.get("/api/v1/models")
|
||
async def ollama_v1_models():
|
||
"""Ollama /api/v1/models redirect — return same list as /v1/models."""
|
||
return await get_models()
|
||
|
||
|
||
@app.get("/v1/props")
|
||
async def llama_cpp_props():
|
||
"""llama.cpp discovery endpoint for Hermes Desktop."""
|
||
async with httpx.AsyncClient(timeout=3.0) as client:
|
||
try:
|
||
resp = await client.get(f"{SIDECAR_URL}/models/status")
|
||
status = resp.json()
|
||
except Exception:
|
||
status = {"active_profile": None, "llama_server_running": False}
|
||
|
||
# Report the currently-running server version / capabilities
|
||
return {
|
||
"props": {
|
||
"version": 1,
|
||
"total_slots": 1,
|
||
"chat_endpoint": "/v1/chat/completions",
|
||
"completion_endpoint": "/v1/completions",
|
||
"embedding_endpoint": "/v1/embeddings",
|
||
"rerank_endpoint": "",
|
||
"health_endpoint": "/health",
|
||
},
|
||
"active_profile": status.get("active_profile"),
|
||
"server_running": status.get("llama_server_running", False),
|
||
}
|
||
|
||
|
||
@app.get("/props")
|
||
async def llm_props():
|
||
"""Legacy llama.cpp discovery endpoint (same as /v1/props)."""
|
||
return await llama_cpp_props()
|
||
|
||
|
||
@app.get("/version")
|
||
async def llm_version():
|
||
"""llama.cpp version endpoint for Hermes Desktop."""
|
||
return {"version": "0.2.0", "build": "router-proxy", "commit": "intelligence-router"}
|
||
|
||
|
||
# ─── GET /models/status ──────────────────────────────────────────────────────
|
||
@app.get("/models/status")
|
||
async def router_model_status():
|
||
"""Proxy to Sidecar /models/status."""
|
||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||
try:
|
||
resp = await client.get(f"{SIDECAR_URL}/models/status")
|
||
return resp.json()
|
||
except Exception:
|
||
return JSONResponse(
|
||
status_code=503,
|
||
content={"error": "Sidecar unavailable"},
|
||
)
|
||
|
||
|
||
# ─── POST /models/switch — Issue #3 ──────────────────────────────────────────
|
||
@app.post("/models/switch")
|
||
async def router_model_switch(request: dict):
|
||
"""Proxy to Sidecar /models/switch."""
|
||
profile_id = request.get("profile_id")
|
||
if not profile_id:
|
||
raise HTTPException(status_code=400, detail="profile_id is required")
|
||
|
||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||
try:
|
||
resp = await client.post(
|
||
f"{SIDECAR_URL}/models/switch",
|
||
json={"profile_id": profile_id},
|
||
)
|
||
return resp.json()
|
||
except Exception as e:
|
||
return JSONResponse(
|
||
status_code=503,
|
||
content={"status": "error", "message": f"Sidecar error: {str(e)}"},
|
||
)
|
||
|
||
|
||
# ─── SSE Progress Stream Generator ───────────────────────────────────────────
|
||
async def sse_progress_stream(event: asyncio.Event):
|
||
"""Generate SSE events while a model switch is in progress."""
|
||
phases = [
|
||
("stopping", "Stopping current model..."),
|
||
("starting", "Loading new model..."),
|
||
("waiting", "Waiting for model to be ready..."),
|
||
]
|
||
for phase, msg in phases:
|
||
if event.is_set():
|
||
yield _sse_format("model_switching", {"phase": phase, "message": msg})
|
||
yield _sse_format("model_switching", {"phase": "complete", "message": "Switch complete"})
|
||
return
|
||
yield _sse_format("model_switching", {"phase": phase, "message": msg})
|
||
await asyncio.sleep(2)
|
||
|
||
yield _sse_format("model_switching", {"phase": "complete", "message": "Processing your request..."})
|
||
|
||
|
||
# ─── Proxy Endpoint — Issues #2–#7 ───────────────────────────────────────────
|
||
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
|
||
async def proxy(
|
||
request: Request,
|
||
path: str,
|
||
x_intelligence_level: str = Header(None),
|
||
):
|
||
"""
|
||
Smart Proxy with full fallback chain.
|
||
Sidecar → Main PC → OpenRouter → LXC
|
||
"""
|
||
# Issue #6: Remove deprecated x-intelligence-level routing
|
||
del x_intelligence_level # type: ignore[unused-coroutine]
|
||
|
||
# Skip proxy for known sidecar admin endpoints
|
||
if path.startswith("models/available") or \
|
||
path.startswith("models/switch") or \
|
||
path.startswith("models/status"):
|
||
raise HTTPException(status_code=404, detail="Use the appropriate endpoint")
|
||
|
||
# ── Determine target URL ──────────────────────────────────────────────
|
||
target_url: Optional[str] = None
|
||
error: Optional[str] = None
|
||
sidecar_status = None
|
||
|
||
# Always query the sidecar first (to detect recovery even when circuit is open)
|
||
async with httpx.AsyncClient(timeout=3.0) as client:
|
||
try:
|
||
resp = await client.get(f"{SIDECAR_URL}/models/status")
|
||
if resp.status_code == 200:
|
||
sidecar_status = resp.json()
|
||
circuit_reset()
|
||
except Exception:
|
||
pass # Handled below
|
||
|
||
if sidecar_status is None:
|
||
circuit_record_failure()
|
||
error = "sidecar_down"
|
||
elif not await circuit_breaker_check():
|
||
# Sidecar is up but circuit is open from prior switch failures
|
||
# Only block the switch — allow routing to already-active backend
|
||
error = "circuit_open"
|
||
if sidecar_status.get("llama_server_running"):
|
||
target_url = f"{MAIN_PC_BASE}/{path}"
|
||
else:
|
||
# Both sidecar reachable and circuit closed — proceed normally
|
||
body = await request.body()
|
||
body_data = json.loads(body) if body else {}
|
||
requested_model = body_data.get("model")
|
||
|
||
# Only trigger model switches for actual chat/completion POST requests.
|
||
# GET probes, /api/show lookups, and other non-chat endpoints should
|
||
# never trigger a switch — they just read current state.
|
||
is_chat_request = (
|
||
request.method == "POST"
|
||
and path in ("v1/chat/completions", "v1/completions")
|
||
)
|
||
|
||
if requested_model and sidecar_status.get("active_profile") == requested_model:
|
||
target_url = f"{MAIN_PC_BASE}/{path}"
|
||
elif requested_model and is_chat_request:
|
||
# All requests during a model switch get an immediate SSE streaming
|
||
# response so clients (Hermes Desktop) don't timeout while waiting
|
||
# for the model to load (10-30s). The switch runs in a background
|
||
# task; the SSE stream yields progress events, then pipes through
|
||
# the actual response once the backend model is ready.
|
||
current_switch = await wait_for_switch()
|
||
if current_switch is None:
|
||
# No switch in progress — start one in the background
|
||
await start_switch()
|
||
asyncio.create_task(_background_switch(requested_model))
|
||
|
||
# Queue this request — signals when switch completes
|
||
try:
|
||
wait_evt = await queue_request()
|
||
except HTTPException as he:
|
||
raise
|
||
|
||
# Build request headers once
|
||
req_headers = dict(request.headers)
|
||
req_headers.pop("host", None)
|
||
|
||
async def stream_with_sse():
|
||
sse_gen = sse_progress_stream(wait_evt)
|
||
try:
|
||
await wait_evt.wait()
|
||
async for sse_chunk in sse_gen:
|
||
yield sse_chunk
|
||
# Send actual request to Main PC
|
||
async with httpx.AsyncClient(timeout=60.0) as c:
|
||
async with c.stream(
|
||
request.method,
|
||
f"{MAIN_PC_BASE}/{path}",
|
||
content=body,
|
||
headers=req_headers,
|
||
) as resp:
|
||
async for chunk in resp.aiter_bytes():
|
||
yield chunk
|
||
except Exception:
|
||
# Main PC unreachable (switch failed or server died) —
|
||
# try fallback chain
|
||
yield _sse_format(
|
||
"error",
|
||
{"message": "Backend unreachable, trying fallback..."},
|
||
)
|
||
# Try OpenRouter
|
||
if OPENROUTER_API_KEY:
|
||
try:
|
||
fb_headers = dict(req_headers)
|
||
fb_headers["Authorization"] = f"Bearer {OPENROUTER_API_KEY}"
|
||
async with httpx.AsyncClient(timeout=60.0) as c:
|
||
async with c.stream(
|
||
request.method,
|
||
f"{OPENROUTER_BASE}/{path}",
|
||
content=body,
|
||
headers=fb_headers,
|
||
) as resp:
|
||
async for chunk in resp.aiter_bytes():
|
||
yield chunk
|
||
return
|
||
except Exception:
|
||
pass
|
||
# Fallback to LXC SLM
|
||
try:
|
||
async with httpx.AsyncClient(timeout=60.0) as c:
|
||
async with c.stream(
|
||
request.method,
|
||
f"{FALLBACK_SLM_URL}/{path}",
|
||
content=body,
|
||
headers=req_headers,
|
||
) as resp:
|
||
async for chunk in resp.aiter_bytes():
|
||
yield chunk
|
||
except Exception:
|
||
yield _sse_format(
|
||
"error",
|
||
{"message": "All backends unavailable"},
|
||
)
|
||
finally:
|
||
try:
|
||
await sse_gen.aclose()
|
||
except Exception:
|
||
pass
|
||
|
||
return StreamingResponse(
|
||
stream_with_sse(),
|
||
media_type="text/event-stream",
|
||
)
|
||
|
||
else:
|
||
# No model in request body (probe/GET/non-chat request) —
|
||
# route to the currently active backend when available,
|
||
# or fall through to the fallback chain.
|
||
if sidecar_status.get("active_profile") and sidecar_status.get("llama_server_running"):
|
||
target_url = f"{MAIN_PC_BASE}/{path}"
|
||
|
||
# ── Fallback chain ────────────────────────────────────────────────────
|
||
if target_url is None:
|
||
if error in ("sidecar_down", "circuit_open", "switch_failed"):
|
||
if OPENROUTER_API_KEY:
|
||
target_url = f"{OPENROUTER_BASE}/{path}"
|
||
else:
|
||
target_url = f"{FALLBACK_SLM_URL}/{path}"
|
||
|
||
if not target_url:
|
||
return Response(content="No valid target available", status_code=503)
|
||
|
||
# ── Prepare request ───────────────────────────────────────────────────
|
||
body = await request.body()
|
||
headers = dict(request.headers)
|
||
headers.pop("host", None)
|
||
headers.pop("content-length", None)
|
||
|
||
# ── Execute request with fallback ─────────────────────────────────────
|
||
exec_error: Optional[Exception] = None
|
||
|
||
async def execute(target: str) -> Optional[Response]:
|
||
async with httpx.AsyncClient(timeout=60.0) as client:
|
||
accept_header = request.headers.get("accept", "")
|
||
if "text/event-stream" in accept_header or "application/x-ndjson" in accept_header:
|
||
async def gen():
|
||
async with client.stream(
|
||
request.method, target,
|
||
content=body, headers=headers,
|
||
) as resp:
|
||
if resp.status_code != 200:
|
||
print(f"PROXY: {target} returned {resp.status_code} during SSE stream", flush=True)
|
||
async for chunk in resp.aiter_bytes():
|
||
yield chunk
|
||
|
||
return StreamingResponse(gen(), status_code=200)
|
||
|
||
resp = await client.request(
|
||
method=request.method,
|
||
url=target,
|
||
content=body,
|
||
headers=headers,
|
||
)
|
||
if resp.status_code != 200:
|
||
body_preview = resp.content[:500].decode("utf-8", errors="replace")
|
||
print(
|
||
f"PROXY: {request.method} {target} returned {resp.status_code}: {body_preview}",
|
||
flush=True,
|
||
)
|
||
return Response(
|
||
content=resp.content,
|
||
status_code=resp.status_code,
|
||
headers=dict(resp.headers),
|
||
)
|
||
|
||
primary_result = None
|
||
try:
|
||
primary_result = await execute(target_url)
|
||
except Exception as e:
|
||
print(
|
||
f"PROXY EXCEPTION on primary {target_url}: {type(e).__name__}: {e}",
|
||
flush=True,
|
||
) # Falls through to fallback chain
|
||
if primary_result is not None:
|
||
return primary_result
|
||
|
||
# ── Fallback chain: Main PC → OpenRouter → LXC ──────────────────────
|
||
fallback_order = []
|
||
|
||
# Determine which backends are still viable
|
||
if target_url.startswith(MAIN_PC_BASE):
|
||
if OPENROUTER_API_KEY:
|
||
fallback_order.append((OPENROUTER_BASE, OPENROUTER_API_KEY))
|
||
fallback_order.append((FALLBACK_SLM_URL, None))
|
||
elif target_url.startswith(OPENROUTER_BASE):
|
||
fallback_order.append((FALLBACK_SLM_URL, None))
|
||
|
||
for fb_base, fb_key in fallback_order:
|
||
# Check health before routing
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
try:
|
||
resp = await client.get(f"{fb_base}/v1/models")
|
||
if resp.status_code != 200:
|
||
continue
|
||
fb_url = f"{fb_base}/{path}"
|
||
if fb_key:
|
||
headers["Authorization"] = f"Bearer {fb_key}"
|
||
result = await execute(fb_url)
|
||
if result is not None:
|
||
return result
|
||
except Exception:
|
||
continue
|
||
|
||
return Response(
|
||
content="No valid target available (all backends down)",
|
||
status_code=503,
|
||
)
|