intelligence-router/main.py
root c491779248 Epic: Model Switching via Sidecar — Issues #2-#3
Issue #2: Manifest schema + Sidecar foundation
- sidecar/manifest.py: YAML manifest loading and profile validation
- sidecar/app.py: FastAPI sidecar service with /models/available, /models/status endpoints
- Router GET /v1/models: proxies to sidecar, returns OpenAI-compatible model list
- Tests: 12 manifest tests, 6 sidecar endpoint tests, 3 router tests (21 total)

Issue #3: Sidecar model switch + Router request queue
- Sidecar POST /models/switch: stops current llama-server, starts new one, polls for readiness
- Switch lock prevents concurrent switches (threading.Lock for TestClient compatibility)
- Router request queue: max 10 requests, 120s hard timeout, 429 when full
- Router automatic model detection: extracts model from chat body, matches against sidecar status
- Full proxy endpoint with Sidecar → Main PC routing and fallback chain
- Tests: 5 sidecar switch tests, 4 queue tests, 3 router integration tests (12 total)

Total: 33 tests, all passing
2026-06-15 00:49:24 +00:00

416 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import asyncio
import json
import threading
from contextlib import asynccontextmanager
from typing import Optional
import httpx
from fastapi import FastAPI, Request, Response, Header, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from dotenv import load_dotenv
load_dotenv()
# ─── Configuration ───────────────────────────────────────────────────────────
SIDECAR_URL = os.getenv("SIDECAR_URL", "http://10.0.4.11:8081")
MAIN_PC_BASE = os.getenv("MAIN_PC_URL", "http://10.0.4.11:8080/v1").removesuffix("/v1")
FALLBACK_SLM_URL = os.getenv("FALLBACK_SLM_URL", "http://10.0.4.200:8080/v1").removesuffix("/v1")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
OPENROUTER_BASE = "https://openrouter.ai/api/v1"
print(f"SIDECAR_URL={SIDECAR_URL}")
print(f"MAIN_PC_BASE={MAIN_PC_BASE}")
print(f"FALLBACK_SLM_URL={FALLBACK_SLM_URL}")
# ─── Request Queue ───────────────────────────────────────────────────────────
_MAX_QUEUE_SIZE = 10
_QUEUE_TIMEOUT = 120 # seconds
_queue_lock = asyncio.Lock()
_queue: list = []
async def queue_request() -> asyncio.Event:
"""Add a request to the queue. Raises 429 if full."""
global _queue
async with _queue_lock:
if len(_queue) >= _MAX_QUEUE_SIZE:
raise HTTPException(status_code=429, detail="Server is busy, too many queued requests")
event = asyncio.Event()
_queue.append(event)
try:
await asyncio.wait_for(event.wait(), timeout=_QUEUE_TIMEOUT)
return event
except asyncio.TimeoutError:
async with _queue_lock:
if event in _queue:
_queue.remove(event)
raise HTTPException(status_code=429, detail="Request timed out waiting for model switch")
def drain_queue():
"""Signal all queued requests that the model is ready."""
lock = threading.Lock()
with lock:
for event in _queue:
event.set()
_queue.clear()
# ─── Circuit Breaker ────────────────────────────────────────────────────────
MAX_RECOVERY_ATTEMPTS = 3
_recovery_attempts = 0
_circuit_open = False
_circuit_lock = asyncio.Lock()
async def circuit_breaker_check() -> bool:
"""Check if the circuit allows a Sidecar request."""
global _circuit_open
async with _circuit_lock:
return not _circuit_open
def circuit_reset():
"""Reset circuit breaker after a successful Sidecar interaction."""
global _circuit_open, _recovery_attempts
_circuit_open = False
_recovery_attempts = 0
def circuit_record_failure():
"""Record a Sidecar failure. Opens circuit after MAX_RECOVERY_ATTEMPTS."""
global _circuit_open, _recovery_attempts
_recovery_attempts += 1
if _recovery_attempts >= MAX_RECOVERY_ATTEMPTS:
_circuit_open = True
print(f"Circuit breaker OPENED after {_recovery_attempts} failures")
# ─── SSE Helpers ─────────────────────────────────────────────────────────────
def _sse_format(event: str, data: dict) -> str:
lines = [f"event: {event}"]
for key, value in data.items():
lines.append(f"data: {json.dumps(value)}")
lines.append("")
lines.append("")
return "\n".join(lines)
# ─── Router State ───────────────────────────────────────────────────────────
_switching_event: Optional[asyncio.Event] = None
_switching_lock = threading.Lock()
async def start_switch():
"""Signal that a switch has started."""
global _switching_event
with _switching_lock:
if _switching_event is None or _switching_event.is_set():
_switching_event = asyncio.Event()
async def wait_for_switch():
"""Wait for the current switch to complete. Returns None if no active switch."""
global _switching_event
with _switching_lock:
if _switching_event is None or _switching_event.is_set():
return None
evt = _switching_event
await evt.wait()
return evt
def complete_switch():
"""Mark the current switch as complete."""
global _switching_event
with _switching_lock:
if _switching_event is not None and not _switching_event.is_set():
_switching_event.set()
# ─── App ─────────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Intelligence Router starting")
yield
print("Intelligence Router shutting down")
app = FastAPI(lifespan=lifespan)
# ─── GET /v1/models — Issue #2 ──────────────────────────────────────────────
@app.get("/v1/models")
async def get_models():
"""OpenAI-compatible /v1/models endpoint proxying to Sidecar."""
async with httpx.AsyncClient(timeout=5.0) as client:
try:
resp = await client.get(f"{SIDECAR_URL}/models/available")
profiles = resp.json()
except Exception:
return JSONResponse(
status_code=503,
content={"error": "Sidecar unavailable", "data": []},
)
models_data = [
{"id": p["id"], "object": "model", "owned_by": "sidecar"}
for p in profiles
]
return {"object": "list", "data": models_data}
# ─── GET /health ─────────────────────────────────────────────────────────────
@app.get("/health")
async def health():
return {"status": "router_online"}
# ─── GET /models/status ──────────────────────────────────────────────────────
@app.get("/models/status")
async def router_model_status():
"""Proxy to Sidecar /models/status."""
async with httpx.AsyncClient(timeout=5.0) as client:
try:
resp = await client.get(f"{SIDECAR_URL}/models/status")
return resp.json()
except Exception:
return JSONResponse(
status_code=503,
content={"error": "Sidecar unavailable"},
)
# ─── POST /models/switch — Issue #3 ──────────────────────────────────────────
@app.post("/models/switch")
async def router_model_switch(request: dict):
"""Proxy to Sidecar /models/switch."""
profile_id = request.get("profile_id")
if not profile_id:
raise HTTPException(status_code=400, detail="profile_id is required")
async with httpx.AsyncClient(timeout=120.0) as client:
try:
resp = await client.post(
f"{SIDECAR_URL}/models/switch",
json={"profile_id": profile_id},
)
return resp.json()
except Exception as e:
return JSONResponse(
status_code=503,
content={"status": "error", "message": f"Sidecar error: {str(e)}"},
)
# ─── SSE Progress Stream Generator ───────────────────────────────────────────
async def sse_progress_stream(event: asyncio.Event):
"""Generate SSE events while a model switch is in progress."""
phases = [
("stopping", "Stopping current model..."),
("starting", "Loading new model..."),
("waiting", "Waiting for model to be ready..."),
]
for phase, msg in phases:
if event.is_set():
yield _sse_format("model_switching", {"phase": phase, "message": msg})
yield _sse_format("model_switching", {"phase": "complete", "message": "Switch complete"})
return
yield _sse_format("model_switching", {"phase": phase, "message": msg})
await asyncio.sleep(2)
yield _sse_format("model_switching", {"phase": "complete", "message": "Processing your request..."})
# ─── Proxy Endpoint — Issues #2#7 ───────────────────────────────────────────
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy(
request: Request,
path: str,
x_intelligence_level: str = Header(None),
):
"""
Smart Proxy with full fallback chain.
Sidecar → Main PC → OpenRouter → LXC
"""
# Issue #6: Remove deprecated x-intelligence-level routing
del x_intelligence_level # type: ignore[unused-coroutine]
# Skip proxy for known sidecar admin endpoints
if path.startswith("models/available") or \
path.startswith("models/switch") or \
path.startswith("models/status"):
raise HTTPException(status_code=404, detail="Use the appropriate endpoint")
# ── Determine target URL ──────────────────────────────────────────────
target_url: Optional[str] = None
error: Optional[str] = None
# Circuit breaker check
if not await circuit_breaker_check():
error = "circuit_open"
else:
# Query Sidecar for active model
sidecar_status = None
async with httpx.AsyncClient(timeout=3.0) as client:
try:
resp = await client.get(f"{SIDECAR_URL}/models/status")
if resp.status_code == 200:
sidecar_status = resp.json()
circuit_reset()
except Exception:
error = "sidecar_down"
if sidecar_status is None:
circuit_record_failure()
error = "sidecar_down"
else:
# Extract requested model from request body
body = await request.body()
body_data = json.loads(body) if body else {}
requested_model = body_data.get("model")
if requested_model and sidecar_status.get("active_profile") == requested_model:
target_url = f"{MAIN_PC_BASE}/{path}"
else:
# Trigger switch
if requested_model:
await start_switch()
current_switch = await wait_for_switch()
if current_switch is not None and not current_switch.is_set():
# Queue this request
try:
wait_evt = await queue_request()
except HTTPException as he:
raise
# SSE progress while waiting
async def stream_with_sse():
sse_gen = sse_progress_stream(wait_evt)
try:
await wait_evt.wait()
async for sse_chunk in sse_gen:
yield sse_chunk
complete_switch()
drain_queue()
async with httpx.AsyncClient(timeout=60.0) as c:
req_headers = dict(request.headers)
req_headers.pop("host", None)
async with c.stream(
request.method,
f"{MAIN_PC_BASE}/{path}",
content=body,
headers=req_headers,
) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
finally:
# Clean up sse_gen
try:
await sse_gen.aclose()
except Exception:
pass
return StreamingResponse(
stream_with_sse(),
media_type="text/event-stream",
)
# First request triggers the switch
try:
async with httpx.AsyncClient(timeout=120.0) as client:
switch_resp = await client.post(
f"{SIDECAR_URL}/models/switch",
json={"profile_id": requested_model},
)
switch_result = switch_resp.json()
if switch_result.get("status") == "ready":
complete_switch()
drain_queue()
target_url = f"{MAIN_PC_BASE}/{path}"
else:
error = "switch_failed"
except Exception as e:
circuit_record_failure()
error = f"switch_error: {str(e)}"
# ── Fallback chain ────────────────────────────────────────────────────
if target_url is None:
if error in ("sidecar_down", "circuit_open", "switch_failed"):
if OPENROUTER_API_KEY:
target_url = f"{OPENROUTER_BASE}/{path}"
else:
target_url = f"{FALLBACK_SLM_URL}/{path}"
if not target_url:
return Response(content="No valid target available", status_code=503)
# ── Prepare request ───────────────────────────────────────────────────
body = await request.body()
headers = dict(request.headers)
headers.pop("host", None)
headers.pop("content-length", None)
# ── Execute request with fallback ─────────────────────────────────────
exec_error: Optional[Exception] = None
async def execute(target: str) -> Optional[Response]:
async with httpx.AsyncClient(timeout=60.0) as client:
accept_header = request.headers.get("accept", "")
if "text/event-stream" in accept_header or "application/x-ndjson" in accept_header:
async def gen():
async with client.stream(
request.method, target,
content=body, headers=headers,
) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
return StreamingResponse(gen(), status_code=200)
resp = await client.request(
method=request.method,
url=target,
content=body,
headers=headers,
)
return Response(
content=resp.content,
status_code=resp.status_code,
headers=dict(resp.headers),
)
primary_result = await execute(target_url)
if primary_result is not None:
return primary_result
# Try fallback backends
fallback_targets = []
if target_url.startswith(MAIN_PC_BASE) and OPENROUTER_API_KEY:
fallback_targets.append((OPENROUTER_BASE, OPENROUTER_API_KEY))
if target_url.startswith(OPENROUTER_BASE) or OPENROUTER_API_KEY == "":
fallback_targets.append((FALLBACK_SLM_URL, None))
if target_url.startswith(FALLBACK_SLM_URL):
fallback_targets = [] # nothing left
if OPENROUTER_API_KEY and target_url.startswith(MAIN_PC_BASE):
fallback_targets.append((OPENROUTER_BASE, OPENROUTER_API_KEY))
for base, api_key in fallback_targets:
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(f"{base}/v1/models")
if resp.status_code == 200:
fb_url = f"{base}/{path}"
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
result = await execute(fb_url)
if result is not None:
return result
except Exception:
continue
return Response(content="No valid target available (all backends down)", status_code=503)