intelligence-router/main.py
root b3ac21b2c0 fix: first request no longer blocks on model switch — uses background task + SSE
The FIRST request that triggers a model switch was blocking the HTTP response
for 10-30s while waiting for the sidecar to load the model. Hermes Desktop's
client timed out during this wait, causing 'nothing happens' on new session.

Fix: refactored the proxy handler so ALL requests during a model switch use
the same SSE streaming pattern (immediate 200, progress events, then actual
response piped through after switch completes). The switch now runs as a
background asyncio task via create_task().

- Added _background_switch() — runs POST /models/switch in background task
  with complete_switch() + drain_queue() in finally block
- All switch-triggering requests go through queue_request() + StreamingResponse
- SSE generator now falls through to OpenRouter/LXC if Main PC unreachable
  (switch failure case) instead of hanging indefinitely

Sidecar fixes from previous commit:
- _kill_llama_server() is now async with proper await on process termination
- _switch_lock changed from threading.Lock to asyncio.Lock()
2026-06-18 00:10:48 +00:00

703 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import asyncio
import json
import threading
from contextlib import asynccontextmanager
from typing import Optional
import httpx
from fastapi import FastAPI, Request, Response, Header, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from dotenv import load_dotenv
load_dotenv()
# ─── Configuration ───────────────────────────────────────────────────────────
SIDECAR_URL = os.getenv("SIDECAR_URL", "http://10.0.4.11:8080")
MAIN_PC_BASE = os.getenv("MAIN_PC_URL", "http://10.0.4.11:8080/v1").removesuffix("/v1")
FALLBACK_SLM_URL = os.getenv("FALLBACK_SLM_URL", "http://10.0.4.200:8080/v1").removesuffix("/v1")
OPENROUTER_API_KEY=os.getenv("OPENROUTER_API_KEY", "")
OPENROUTER_BASE = "https://openrouter.ai"
print(f"SIDECAR_URL={SIDECAR_URL}")
print(f"MAIN_PC_BASE={MAIN_PC_BASE}")
print(f"FALLBACK_SLM_URL={FALLBACK_SLM_URL}")
# ─── Request Queue ───────────────────────────────────────────────────────────
_MAX_QUEUE_SIZE = 10
_QUEUE_TIMEOUT = 120 # seconds
_queue_lock = asyncio.Lock()
_queue: list = []
async def queue_request() -> asyncio.Event:
"""Add a request to the queue. Raises 429 if full."""
global _queue
async with _queue_lock:
if len(_queue) >= _MAX_QUEUE_SIZE:
raise HTTPException(status_code=429, detail="Server is busy, too many queued requests")
event = asyncio.Event()
_queue.append(event)
try:
await asyncio.wait_for(event.wait(), timeout=_QUEUE_TIMEOUT)
return event
except asyncio.TimeoutError:
async with _queue_lock:
if event in _queue:
_queue.remove(event)
raise HTTPException(status_code=429, detail="Request timed out waiting for model switch")
def drain_queue():
"""Signal all queued requests that the model is ready."""
lock = threading.Lock()
with lock:
for event in _queue:
event.set()
_queue.clear()
# ─── Circuit Breaker ────────────────────────────────────────────────────────
MAX_RECOVERY_ATTEMPTS = 3
_recovery_attempts = 0
_circuit_open = False
_circuit_lock = asyncio.Lock()
async def circuit_breaker_check() -> bool:
"""Check if the circuit allows a Sidecar request."""
global _circuit_open
async with _circuit_lock:
return not _circuit_open
def circuit_reset():
"""Reset circuit breaker after a successful Sidecar interaction."""
global _circuit_open, _recovery_attempts
_circuit_open = False
_recovery_attempts = 0
def circuit_record_failure():
"""Record a Sidecar failure. Opens circuit after MAX_RECOVERY_ATTEMPTS."""
global _circuit_open, _recovery_attempts
_recovery_attempts += 1
if _recovery_attempts >= MAX_RECOVERY_ATTEMPTS:
_circuit_open = True
print(f"Circuit breaker OPENED after {_recovery_attempts} failures")
# ─── SSE Helpers ─────────────────────────────────────────────────────────────
def _sse_format(event: str, data: dict) -> str:
lines = [f"event: {event}"]
lines.append(f"data: {json.dumps(data)}")
lines.append("")
lines.append("")
return "\n".join(lines)
# ─── Router State ───────────────────────────────────────────────────────────
_switching_event: Optional[asyncio.Event] = None
_switching_lock = threading.Lock()
async def start_switch():
"""Signal that a switch has started. Creates an unset event to track the switch."""
global _switching_event
with _switching_lock:
if _switching_event is None or _switching_event.is_set():
_switching_event = asyncio.Event()
async def wait_for_switch():
"""Wait for the current switch to complete. Returns None if no active switch.
Returns None immediately if no switch is in progress (event is None or set).
If a switch IS in progress, waits for it to complete and then clears the event.
"""
global _switching_event
with _switching_lock:
if _switching_event is None or _switching_event.is_set():
# No switch happening, or already done
return None
evt = _switching_event
# A switch IS in progress — wait for it
await evt.wait()
# Switch is done — clear for next time
with _switching_lock:
if _switching_event is not None and _switching_event.is_set():
_switching_event = None
def complete_switch():
"""Mark the current switch as complete. Signals waiting requests."""
global _switching_event
with _switching_lock:
if _switching_event is not None and not _switching_event.is_set():
_switching_event.set()
async def _background_switch(requested_model: str):
"""Run a model switch in the background.
The sidecar POST is awaited but the caller gets an immediate SSE stream
so Hermes Desktop doesn't timeout waiting for the first response.
Called via asyncio.create_task() so it runs concurrently with the
SSE stream being sent to the client.
"""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
switch_resp = await client.post(
f"{SIDECAR_URL}/models/switch",
json={"profile_id": requested_model},
)
switch_result = switch_resp.json()
if switch_result.get("status") == "ready":
print(
f"SWITCH SUCCESS: profile={requested_model}",
flush=True,
)
else:
circuit_record_failure()
print(
f"SWITCH FAILED: profile={requested_model}, "
f"status={switch_result.get('status')}, "
f"message={switch_result.get('message', '(no message)')}",
flush=True,
)
except Exception as e:
circuit_record_failure()
print(
f"SWITCH EXCEPTION: profile={requested_model}, "
f"error={type(e).__name__}: {e}",
flush=True,
)
finally:
# Signal all queued requests so they can proceed (and fall
# through to the fallback chain if the switch failed).
complete_switch()
drain_queue()
# ─── App ─────────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Intelligence Router starting")
yield
print("Intelligence Router shutting down")
app = FastAPI(lifespan=lifespan)
# ─── GET /v1/models — Issue #2 ──────────────────────────────────────────────
@app.get("/v1")
async def v1_root():
"""OpenAI API root — return basic info for Hermes Desktop WebUI probe."""
return {"object": "list", "data": []}
@app.get("/v1/models")
async def get_models():
"""OpenAI-compatible /v1/models endpoint proxying to Sidecar."""
async with httpx.AsyncClient(timeout=5.0) as client:
try:
resp = await client.get(f"{SIDECAR_URL}/models/available")
profiles = resp.json()
except Exception:
return JSONResponse(
status_code=503,
content={"error": "Sidecar unavailable", "data": []},
)
models_data = [
{"id": p["id"], "object": "model", "owned_by": "sidecar"}
for p in profiles
]
return {"object": "list", "data": models_data}
# ─── GET /health ─────────────────────────────────────────────────────────────
@app.get("/health")
async def health():
return {"status": "router_online"}
# ─── Hermes Desktop Probe Endpoints ──────────────────────────────────────────
# These endpoints are probed by Hermes Desktop to validate/identify the
# provider before allowing model switching. Without them the desktop
# returns 503 and refuses to switch models.
@app.get("/v1/models/{model_id:path}")
async def get_single_model(model_id: str):
"""OpenAI-compatible single model query. Proxied via Sidecar model list."""
async with httpx.AsyncClient(timeout=5.0) as client:
try:
resp = await client.get(f"{SIDECAR_URL}/models/available")
profiles = resp.json()
except Exception:
return JSONResponse(
status_code=503,
content={"error": "Sidecar unavailable", "data": []},
)
for p in profiles:
if p.get("id") == model_id:
return {"id": p["id"], "object": "model", "owned_by": "sidecar"}
return JSONResponse(status_code=404, content={"error": "model not found", "id": model_id})
@app.get("/api/tags")
async def ollama_tags():
"""Ollama-compatible model list for Hermes Desktop discovery."""
async with httpx.AsyncClient(timeout=5.0) as client:
try:
resp = await client.get(f"{SIDECAR_URL}/models/available")
profiles = resp.json()
except Exception:
return JSONResponse(content={"models": []})
models = []
for p in profiles:
models.append({
"name": p.get("id", ""),
"model": p.get("id", ""),
"modified_at": "2025-01-01T00:00:00Z",
"size": 0,
"digest": "",
"details": {"format": "gguf", "family": p.get("name", "llm")},
})
return {"models": models}
@app.get("/api/show")
async def ollama_show_get(model: str = ""):
"""Ollama-compatible model info for Hermes Desktop discovery (GET variant).
Some Hermes Desktop versions probe /api/show via GET with a ?model= parameter.
"""
return await _ollama_show_lookup(model)
@app.post("/api/show")
async def ollama_show_post(request: Request):
"""Ollama-compatible model info for Hermes Desktop discovery (POST variant)."""
body = await request.body()
body_data = json.loads(body) if body else {}
model_name = body_data.get("model", "")
return await _ollama_show_lookup(model_name)
async def _ollama_show_lookup(model_name: str):
"""Shared logic for Ollama /api/show model info lookup.
When model_name is empty string (Hermes Desktop probe with no model field),
returns the currently-active profile's info so the desktop can determine
the correct context size. Previously returned 404, causing Hermes Desktop
to default to 256k context.
"""
async with httpx.AsyncClient(timeout=5.0) as client:
try:
resp = await client.get(f"{SIDECAR_URL}/models/available")
profiles = resp.json()
status_resp = await client.get(f"{SIDECAR_URL}/models/status")
status = status_resp.json()
except Exception:
return JSONResponse(status_code=404, content={"error": "model not found"})
# If no model specified, return the currently-active profile's info
active_id = status.get("active_profile")
if not model_name and active_id:
for p in profiles:
if p.get("id") == active_id:
flags = p.get("flags", {})
ctx_size = str(flags.get("ctx-size", flags.get("n_ctx", "4096")))
return {
"modelfile": "",
"parameters": f"num_ctx {ctx_size}",
"template": "",
"details": {
"format": "gguf",
"family": p.get("name", "llm"),
"parameter_size": ctx_size,
},
"model_info": {"id": p.get("id", "")},
}
for p in profiles:
if p.get("id") == model_name:
# Extract actual context size from the profile's flags
flags = p.get("flags", {})
ctx_size = str(flags.get("ctx-size", flags.get("n_ctx", "4096")))
return {
"modelfile": "",
"parameters": f"num_ctx {ctx_size}",
"template": "",
"details": {
"format": "gguf",
"family": p.get("name", "llm"),
"parameter_size": ctx_size,
},
"model_info": {"id": p.get("id", "")},
}
return JSONResponse(status_code=404, content={"error": "model not found"})
@app.get("/api/v1/models")
async def ollama_v1_models():
"""Ollama /api/v1/models redirect — return same list as /v1/models."""
return await get_models()
@app.get("/v1/props")
async def llama_cpp_props():
"""llama.cpp discovery endpoint for Hermes Desktop."""
async with httpx.AsyncClient(timeout=3.0) as client:
try:
resp = await client.get(f"{SIDECAR_URL}/models/status")
status = resp.json()
except Exception:
status = {"active_profile": None, "llama_server_running": False}
# Report the currently-running server version / capabilities
return {
"props": {
"version": 1,
"total_slots": 1,
"chat_endpoint": "/v1/chat/completions",
"completion_endpoint": "/v1/completions",
"embedding_endpoint": "/v1/embeddings",
"rerank_endpoint": "",
"health_endpoint": "/health",
},
"active_profile": status.get("active_profile"),
"server_running": status.get("llama_server_running", False),
}
@app.get("/props")
async def llm_props():
"""Legacy llama.cpp discovery endpoint (same as /v1/props)."""
return await llama_cpp_props()
@app.get("/version")
async def llm_version():
"""llama.cpp version endpoint for Hermes Desktop."""
return {"version": "0.2.0", "build": "router-proxy", "commit": "intelligence-router"}
# ─── GET /models/status ──────────────────────────────────────────────────────
@app.get("/models/status")
async def router_model_status():
"""Proxy to Sidecar /models/status."""
async with httpx.AsyncClient(timeout=5.0) as client:
try:
resp = await client.get(f"{SIDECAR_URL}/models/status")
return resp.json()
except Exception:
return JSONResponse(
status_code=503,
content={"error": "Sidecar unavailable"},
)
# ─── POST /models/switch — Issue #3 ──────────────────────────────────────────
@app.post("/models/switch")
async def router_model_switch(request: dict):
"""Proxy to Sidecar /models/switch."""
profile_id = request.get("profile_id")
if not profile_id:
raise HTTPException(status_code=400, detail="profile_id is required")
async with httpx.AsyncClient(timeout=120.0) as client:
try:
resp = await client.post(
f"{SIDECAR_URL}/models/switch",
json={"profile_id": profile_id},
)
return resp.json()
except Exception as e:
return JSONResponse(
status_code=503,
content={"status": "error", "message": f"Sidecar error: {str(e)}"},
)
# ─── SSE Progress Stream Generator ───────────────────────────────────────────
async def sse_progress_stream(event: asyncio.Event):
"""Generate SSE events while a model switch is in progress."""
phases = [
("stopping", "Stopping current model..."),
("starting", "Loading new model..."),
("waiting", "Waiting for model to be ready..."),
]
for phase, msg in phases:
if event.is_set():
yield _sse_format("model_switching", {"phase": phase, "message": msg})
yield _sse_format("model_switching", {"phase": "complete", "message": "Switch complete"})
return
yield _sse_format("model_switching", {"phase": phase, "message": msg})
await asyncio.sleep(2)
yield _sse_format("model_switching", {"phase": "complete", "message": "Processing your request..."})
# ─── Proxy Endpoint — Issues #2#7 ───────────────────────────────────────────
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy(
request: Request,
path: str,
x_intelligence_level: str = Header(None),
):
"""
Smart Proxy with full fallback chain.
Sidecar → Main PC → OpenRouter → LXC
"""
# Issue #6: Remove deprecated x-intelligence-level routing
del x_intelligence_level # type: ignore[unused-coroutine]
# Skip proxy for known sidecar admin endpoints
if path.startswith("models/available") or \
path.startswith("models/switch") or \
path.startswith("models/status"):
raise HTTPException(status_code=404, detail="Use the appropriate endpoint")
# ── Determine target URL ──────────────────────────────────────────────
target_url: Optional[str] = None
error: Optional[str] = None
sidecar_status = None
# Always query the sidecar first (to detect recovery even when circuit is open)
async with httpx.AsyncClient(timeout=3.0) as client:
try:
resp = await client.get(f"{SIDECAR_URL}/models/status")
if resp.status_code == 200:
sidecar_status = resp.json()
circuit_reset()
except Exception:
pass # Handled below
if sidecar_status is None:
circuit_record_failure()
error = "sidecar_down"
elif not await circuit_breaker_check():
# Sidecar is up but circuit is open from prior switch failures
# Only block the switch — allow routing to already-active backend
error = "circuit_open"
if sidecar_status.get("llama_server_running"):
target_url = f"{MAIN_PC_BASE}/{path}"
else:
# Both sidecar reachable and circuit closed — proceed normally
body = await request.body()
body_data = json.loads(body) if body else {}
requested_model = body_data.get("model")
# Only trigger model switches for actual chat/completion POST requests.
# GET probes, /api/show lookups, and other non-chat endpoints should
# never trigger a switch — they just read current state.
is_chat_request = (
request.method == "POST"
and path in ("v1/chat/completions", "v1/completions")
)
if requested_model and sidecar_status.get("active_profile") == requested_model:
target_url = f"{MAIN_PC_BASE}/{path}"
elif requested_model and is_chat_request:
# All requests during a model switch get an immediate SSE streaming
# response so clients (Hermes Desktop) don't timeout while waiting
# for the model to load (10-30s). The switch runs in a background
# task; the SSE stream yields progress events, then pipes through
# the actual response once the backend model is ready.
current_switch = await wait_for_switch()
if current_switch is None:
# No switch in progress — start one in the background
await start_switch()
asyncio.create_task(_background_switch(requested_model))
# Queue this request — signals when switch completes
try:
wait_evt = await queue_request()
except HTTPException as he:
raise
# Build request headers once
req_headers = dict(request.headers)
req_headers.pop("host", None)
async def stream_with_sse():
sse_gen = sse_progress_stream(wait_evt)
try:
await wait_evt.wait()
async for sse_chunk in sse_gen:
yield sse_chunk
# Send actual request to Main PC
async with httpx.AsyncClient(timeout=60.0) as c:
async with c.stream(
request.method,
f"{MAIN_PC_BASE}/{path}",
content=body,
headers=req_headers,
) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
except Exception:
# Main PC unreachable (switch failed or server died) —
# try fallback chain
yield _sse_format(
"error",
{"message": "Backend unreachable, trying fallback..."},
)
# Try OpenRouter
if OPENROUTER_API_KEY:
try:
fb_headers = dict(req_headers)
fb_headers["Authorization"] = f"Bearer {OPENROUTER_API_KEY}"
async with httpx.AsyncClient(timeout=60.0) as c:
async with c.stream(
request.method,
f"{OPENROUTER_BASE}/{path}",
content=body,
headers=fb_headers,
) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
return
except Exception:
pass
# Fallback to LXC SLM
try:
async with httpx.AsyncClient(timeout=60.0) as c:
async with c.stream(
request.method,
f"{FALLBACK_SLM_URL}/{path}",
content=body,
headers=req_headers,
) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
except Exception:
yield _sse_format(
"error",
{"message": "All backends unavailable"},
)
finally:
try:
await sse_gen.aclose()
except Exception:
pass
return StreamingResponse(
stream_with_sse(),
media_type="text/event-stream",
)
else:
# No model in request body (probe/GET/non-chat request) —
# route to the currently active backend when available,
# or fall through to the fallback chain.
if sidecar_status.get("active_profile") and sidecar_status.get("llama_server_running"):
target_url = f"{MAIN_PC_BASE}/{path}"
# ── Fallback chain ────────────────────────────────────────────────────
if target_url is None:
if error in ("sidecar_down", "circuit_open", "switch_failed"):
if OPENROUTER_API_KEY:
target_url = f"{OPENROUTER_BASE}/{path}"
else:
target_url = f"{FALLBACK_SLM_URL}/{path}"
if not target_url:
return Response(content="No valid target available", status_code=503)
# ── Prepare request ───────────────────────────────────────────────────
body = await request.body()
headers = dict(request.headers)
headers.pop("host", None)
headers.pop("content-length", None)
# ── Execute request with fallback ─────────────────────────────────────
exec_error: Optional[Exception] = None
async def execute(target: str) -> Optional[Response]:
async with httpx.AsyncClient(timeout=60.0) as client:
accept_header = request.headers.get("accept", "")
if "text/event-stream" in accept_header or "application/x-ndjson" in accept_header:
async def gen():
async with client.stream(
request.method, target,
content=body, headers=headers,
) as resp:
if resp.status_code != 200:
print(f"PROXY: {target} returned {resp.status_code} during SSE stream", flush=True)
async for chunk in resp.aiter_bytes():
yield chunk
return StreamingResponse(gen(), status_code=200)
resp = await client.request(
method=request.method,
url=target,
content=body,
headers=headers,
)
if resp.status_code != 200:
body_preview = resp.content[:500].decode("utf-8", errors="replace")
print(
f"PROXY: {request.method} {target} returned {resp.status_code}: {body_preview}",
flush=True,
)
return Response(
content=resp.content,
status_code=resp.status_code,
headers=dict(resp.headers),
)
primary_result = None
try:
primary_result = await execute(target_url)
except Exception as e:
print(
f"PROXY EXCEPTION on primary {target_url}: {type(e).__name__}: {e}",
flush=True,
) # Falls through to fallback chain
if primary_result is not None:
return primary_result
# ── Fallback chain: Main PC → OpenRouter → LXC ──────────────────────
fallback_order = []
# Determine which backends are still viable
if target_url.startswith(MAIN_PC_BASE):
if OPENROUTER_API_KEY:
fallback_order.append((OPENROUTER_BASE, OPENROUTER_API_KEY))
fallback_order.append((FALLBACK_SLM_URL, None))
elif target_url.startswith(OPENROUTER_BASE):
fallback_order.append((FALLBACK_SLM_URL, None))
for fb_base, fb_key in fallback_order:
# Check health before routing
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(f"{fb_base}/v1/models")
if resp.status_code != 200:
continue
fb_url = f"{fb_base}/{path}"
if fb_key:
headers["Authorization"] = f"Bearer {fb_key}"
result = await execute(fb_url)
if result is not None:
return result
except Exception:
continue
return Response(
content="No valid target available (all backends down)",
status_code=503,
)