import os import asyncio import json import threading from contextlib import asynccontextmanager from typing import Optional import httpx from fastapi import FastAPI, Request, Response, Header, HTTPException from fastapi.responses import StreamingResponse, JSONResponse from dotenv import load_dotenv load_dotenv() # ─── Configuration ─────────────────────────────────────────────────────────── SIDECAR_URL = os.getenv("SIDECAR_URL", "http://10.0.4.11:8080") MAIN_PC_BASE = os.getenv("MAIN_PC_URL", "http://10.0.4.11:8080/v1").removesuffix("/v1") FALLBACK_SLM_URL = os.getenv("FALLBACK_SLM_URL", "http://10.0.4.200:8080/v1").removesuffix("/v1") OPENROUTER_API_KEY=os.getenv("OPENROUTER_API_KEY", "") OPENROUTER_BASE = "https://openrouter.ai" print(f"SIDECAR_URL={SIDECAR_URL}") print(f"MAIN_PC_BASE={MAIN_PC_BASE}") print(f"FALLBACK_SLM_URL={FALLBACK_SLM_URL}") # ─── Request Queue ─────────────────────────────────────────────────────────── _MAX_QUEUE_SIZE = 10 _QUEUE_TIMEOUT = 120 # seconds _queue_lock = asyncio.Lock() _queue: list = [] async def queue_request() -> asyncio.Event: """Add a request to the queue. Raises 429 if full.""" global _queue async with _queue_lock: if len(_queue) >= _MAX_QUEUE_SIZE: raise HTTPException(status_code=429, detail="Server is busy, too many queued requests") event = asyncio.Event() _queue.append(event) try: await asyncio.wait_for(event.wait(), timeout=_QUEUE_TIMEOUT) return event except asyncio.TimeoutError: async with _queue_lock: if event in _queue: _queue.remove(event) raise HTTPException(status_code=429, detail="Request timed out waiting for model switch") def drain_queue(): """Signal all queued requests that the model is ready.""" lock = threading.Lock() with lock: for event in _queue: event.set() _queue.clear() # ─── Circuit Breaker ──────────────────────────────────────────────────────── MAX_RECOVERY_ATTEMPTS = 3 _recovery_attempts = 0 _circuit_open = False _circuit_lock = asyncio.Lock() async def circuit_breaker_check() -> bool: """Check if the circuit allows a Sidecar request.""" global _circuit_open async with _circuit_lock: return not _circuit_open def circuit_reset(): """Reset circuit breaker after a successful Sidecar interaction.""" global _circuit_open, _recovery_attempts _circuit_open = False _recovery_attempts = 0 def circuit_record_failure(): """Record a Sidecar failure. Opens circuit after MAX_RECOVERY_ATTEMPTS.""" global _circuit_open, _recovery_attempts _recovery_attempts += 1 if _recovery_attempts >= MAX_RECOVERY_ATTEMPTS: _circuit_open = True print(f"Circuit breaker OPENED after {_recovery_attempts} failures") # ─── SSE Helpers ───────────────────────────────────────────────────────────── def _sse_format(event: str, data: dict) -> str: lines = [f"event: {event}"] lines.append(f"data: {json.dumps(data)}") lines.append("") lines.append("") return "\n".join(lines) # ─── Router State ─────────────────────────────────────────────────────────── _switching_event: Optional[asyncio.Event] = None _switching_lock = threading.Lock() async def start_switch(): """Signal that a switch has started. Creates an unset event to track the switch.""" global _switching_event with _switching_lock: if _switching_event is None or _switching_event.is_set(): _switching_event = asyncio.Event() async def wait_for_switch(): """Wait for the current switch to complete. Returns None if no active switch. Returns None immediately if no switch is in progress (event is None or set). If a switch IS in progress, waits for it to complete and then clears the event. """ global _switching_event with _switching_lock: if _switching_event is None or _switching_event.is_set(): # No switch happening, or already done return None evt = _switching_event # A switch IS in progress — wait for it await evt.wait() # Switch is done — clear for next time with _switching_lock: if _switching_event is not None and _switching_event.is_set(): _switching_event = None def complete_switch(): """Mark the current switch as complete. Signals waiting requests.""" global _switching_event with _switching_lock: if _switching_event is not None and not _switching_event.is_set(): _switching_event.set() # ─── App ───────────────────────────────────────────────────────────────────── @asynccontextmanager async def lifespan(app: FastAPI): print("Intelligence Router starting") yield print("Intelligence Router shutting down") app = FastAPI(lifespan=lifespan) # ─── GET /v1/models — Issue #2 ────────────────────────────────────────────── @app.get("/v1") async def v1_root(): """OpenAI API root — return basic info for Hermes Desktop WebUI probe.""" return {"object": "list", "data": []} @app.get("/v1/models") async def get_models(): """OpenAI-compatible /v1/models endpoint proxying to Sidecar.""" async with httpx.AsyncClient(timeout=5.0) as client: try: resp = await client.get(f"{SIDECAR_URL}/models/available") profiles = resp.json() except Exception: return JSONResponse( status_code=503, content={"error": "Sidecar unavailable", "data": []}, ) models_data = [ {"id": p["id"], "object": "model", "owned_by": "sidecar"} for p in profiles ] return {"object": "list", "data": models_data} # ─── GET /health ───────────────────────────────────────────────────────────── @app.get("/health") async def health(): return {"status": "router_online"} # ─── Hermes Desktop Probe Endpoints ────────────────────────────────────────── # These endpoints are probed by Hermes Desktop to validate/identify the # provider before allowing model switching. Without them the desktop # returns 503 and refuses to switch models. @app.get("/v1/models/{model_id:path}") async def get_single_model(model_id: str): """OpenAI-compatible single model query. Proxied via Sidecar model list.""" async with httpx.AsyncClient(timeout=5.0) as client: try: resp = await client.get(f"{SIDECAR_URL}/models/available") profiles = resp.json() except Exception: return JSONResponse( status_code=503, content={"error": "Sidecar unavailable", "data": []}, ) for p in profiles: if p.get("id") == model_id: return {"id": p["id"], "object": "model", "owned_by": "sidecar"} return JSONResponse(status_code=404, content={"error": "model not found", "id": model_id}) @app.get("/api/tags") async def ollama_tags(): """Ollama-compatible model list for Hermes Desktop discovery.""" async with httpx.AsyncClient(timeout=5.0) as client: try: resp = await client.get(f"{SIDECAR_URL}/models/available") profiles = resp.json() except Exception: return JSONResponse(content={"models": []}) models = [] for p in profiles: models.append({ "name": p.get("id", ""), "model": p.get("id", ""), "modified_at": "2025-01-01T00:00:00Z", "size": 0, "digest": "", "details": {"format": "gguf", "family": p.get("name", "llm")}, }) return {"models": models} @app.get("/api/show") async def ollama_show_get(model: str = ""): """Ollama-compatible model info for Hermes Desktop discovery (GET variant). Some Hermes Desktop versions probe /api/show via GET with a ?model= parameter. """ return await _ollama_show_lookup(model) @app.post("/api/show") async def ollama_show_post(request: Request): """Ollama-compatible model info for Hermes Desktop discovery (POST variant).""" body = await request.body() body_data = json.loads(body) if body else {} model_name = body_data.get("model", "") return await _ollama_show_lookup(model_name) async def _ollama_show_lookup(model_name: str): """Shared logic for Ollama /api/show model info lookup.""" async with httpx.AsyncClient(timeout=5.0) as client: try: resp = await client.get(f"{SIDECAR_URL}/models/available") profiles = resp.json() except Exception: return JSONResponse(status_code=404, content={"error": "model not found"}) for p in profiles: if p.get("id") == model_name: # Extract actual context size from the profile's flags flags = p.get("flags", {}) ctx_size = str(flags.get("ctx-size", flags.get("n_ctx", "4096"))) return { "modelfile": "", "parameters": f"num_ctx {ctx_size}", "template": "", "details": { "format": "gguf", "family": p.get("name", "llm"), "parameter_size": ctx_size, }, "model_info": {"id": p.get("id", "")}, } return JSONResponse(status_code=404, content={"error": "model not found"}) @app.get("/api/v1/models") async def ollama_v1_models(): """Ollama /api/v1/models redirect — return same list as /v1/models.""" return await get_models() @app.get("/v1/props") async def llama_cpp_props(): """llama.cpp discovery endpoint for Hermes Desktop.""" async with httpx.AsyncClient(timeout=3.0) as client: try: resp = await client.get(f"{SIDECAR_URL}/models/status") status = resp.json() except Exception: status = {"active_profile": None, "llama_server_running": False} # Report the currently-running server version / capabilities return { "props": { "version": 1, "total_slots": 1, "chat_endpoint": "/v1/chat/completions", "completion_endpoint": "/v1/completions", "embedding_endpoint": "/v1/embeddings", "rerank_endpoint": "", "health_endpoint": "/health", }, "active_profile": status.get("active_profile"), "server_running": status.get("llama_server_running", False), } @app.get("/props") async def llm_props(): """Legacy llama.cpp discovery endpoint (same as /v1/props).""" return await llama_cpp_props() @app.get("/version") async def llm_version(): """llama.cpp version endpoint for Hermes Desktop.""" return {"version": "0.2.0", "build": "router-proxy", "commit": "intelligence-router"} # ─── GET /models/status ────────────────────────────────────────────────────── @app.get("/models/status") async def router_model_status(): """Proxy to Sidecar /models/status.""" async with httpx.AsyncClient(timeout=5.0) as client: try: resp = await client.get(f"{SIDECAR_URL}/models/status") return resp.json() except Exception: return JSONResponse( status_code=503, content={"error": "Sidecar unavailable"}, ) # ─── POST /models/switch — Issue #3 ────────────────────────────────────────── @app.post("/models/switch") async def router_model_switch(request: dict): """Proxy to Sidecar /models/switch.""" profile_id = request.get("profile_id") if not profile_id: raise HTTPException(status_code=400, detail="profile_id is required") async with httpx.AsyncClient(timeout=120.0) as client: try: resp = await client.post( f"{SIDECAR_URL}/models/switch", json={"profile_id": profile_id}, ) return resp.json() except Exception as e: return JSONResponse( status_code=503, content={"status": "error", "message": f"Sidecar error: {str(e)}"}, ) # ─── SSE Progress Stream Generator ─────────────────────────────────────────── async def sse_progress_stream(event: asyncio.Event): """Generate SSE events while a model switch is in progress.""" phases = [ ("stopping", "Stopping current model..."), ("starting", "Loading new model..."), ("waiting", "Waiting for model to be ready..."), ] for phase, msg in phases: if event.is_set(): yield _sse_format("model_switching", {"phase": phase, "message": msg}) yield _sse_format("model_switching", {"phase": "complete", "message": "Switch complete"}) return yield _sse_format("model_switching", {"phase": phase, "message": msg}) await asyncio.sleep(2) yield _sse_format("model_switching", {"phase": "complete", "message": "Processing your request..."}) # ─── Proxy Endpoint — Issues #2–#7 ─────────────────────────────────────────── @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) async def proxy( request: Request, path: str, x_intelligence_level: str = Header(None), ): """ Smart Proxy with full fallback chain. Sidecar → Main PC → OpenRouter → LXC """ # Issue #6: Remove deprecated x-intelligence-level routing del x_intelligence_level # type: ignore[unused-coroutine] # Skip proxy for known sidecar admin endpoints if path.startswith("models/available") or \ path.startswith("models/switch") or \ path.startswith("models/status"): raise HTTPException(status_code=404, detail="Use the appropriate endpoint") # ── Determine target URL ────────────────────────────────────────────── target_url: Optional[str] = None error: Optional[str] = None # Circuit breaker check if not await circuit_breaker_check(): error = "circuit_open" else: # Query Sidecar for active model sidecar_status = None async with httpx.AsyncClient(timeout=3.0) as client: try: resp = await client.get(f"{SIDECAR_URL}/models/status") if resp.status_code == 200: sidecar_status = resp.json() circuit_reset() except Exception: error = "sidecar_down" if sidecar_status is None: circuit_record_failure() error = "sidecar_down" else: # Extract requested model from request body body = await request.body() body_data = json.loads(body) if body else {} requested_model = body_data.get("model") if requested_model and sidecar_status.get("active_profile") == requested_model: target_url = f"{MAIN_PC_BASE}/{path}" elif requested_model: # Trigger switch for a specific model request # Check if a switch is already in progress current_switch = await wait_for_switch() if current_switch is not None and not current_switch.is_set(): # Another request started the switch — queue this one try: wait_evt = await queue_request() except HTTPException as he: raise # SSE progress while waiting async def stream_with_sse(): sse_gen = sse_progress_stream(wait_evt) try: await wait_evt.wait() async for sse_chunk in sse_gen: yield sse_chunk complete_switch() drain_queue() async with httpx.AsyncClient(timeout=60.0) as c: req_headers = dict(request.headers) req_headers.pop("host", None) async with c.stream( request.method, f"{MAIN_PC_BASE}/{path}", content=body, headers=req_headers, ) as resp: async for chunk in resp.aiter_bytes(): yield chunk finally: # Clean up sse_gen try: await sse_gen.aclose() except Exception: pass return StreamingResponse( stream_with_sse(), media_type="text/event-stream", ) # First request triggers the switch await start_switch() # Create event for tracking try: async with httpx.AsyncClient(timeout=120.0) as client: switch_resp = await client.post( f"{SIDECAR_URL}/models/switch", json={"profile_id": requested_model}, ) switch_result = switch_resp.json() if switch_result.get("status") == "ready": complete_switch() drain_queue() target_url = f"{MAIN_PC_BASE}/{path}" else: error = "switch_failed" except Exception as e: circuit_record_failure() error = f"switch_error: {str(e)}" else: # No model in request body (probe/GET/non-chat request) — # route to the currently active backend when available, # or fall through to the fallback chain. if sidecar_status.get("active_profile") and sidecar_status.get("llama_server_running"): target_url = f"{MAIN_PC_BASE}/{path}" # ── Fallback chain ──────────────────────────────────────────────────── if target_url is None: if error in ("sidecar_down", "circuit_open", "switch_failed"): if OPENROUTER_API_KEY: target_url = f"{OPENROUTER_BASE}/{path}" else: target_url = f"{FALLBACK_SLM_URL}/{path}" if not target_url: return Response(content="No valid target available", status_code=503) # ── Prepare request ─────────────────────────────────────────────────── body = await request.body() headers = dict(request.headers) headers.pop("host", None) headers.pop("content-length", None) # ── Execute request with fallback ───────────────────────────────────── exec_error: Optional[Exception] = None async def execute(target: str) -> Optional[Response]: async with httpx.AsyncClient(timeout=60.0) as client: accept_header = request.headers.get("accept", "") if "text/event-stream" in accept_header or "application/x-ndjson" in accept_header: async def gen(): async with client.stream( request.method, target, content=body, headers=headers, ) as resp: if resp.status_code != 200: print(f"PROXY: {target} returned {resp.status_code} during SSE stream", flush=True) async for chunk in resp.aiter_bytes(): yield chunk return StreamingResponse(gen(), status_code=200) resp = await client.request( method=request.method, url=target, content=body, headers=headers, ) if resp.status_code != 200: body_preview = resp.content[:500].decode("utf-8", errors="replace") print( f"PROXY: {request.method} {target} returned {resp.status_code}: {body_preview}", flush=True, ) return Response( content=resp.content, status_code=resp.status_code, headers=dict(resp.headers), ) primary_result = None try: primary_result = await execute(target_url) except Exception: pass # Falls through to fallback chain if primary_result is not None: return primary_result # ── Fallback chain: Main PC → OpenRouter → LXC ────────────────────── fallback_order = [] # Determine which backends are still viable if target_url.startswith(MAIN_PC_BASE): if OPENROUTER_API_KEY: fallback_order.append((OPENROUTER_BASE, OPENROUTER_API_KEY)) fallback_order.append((FALLBACK_SLM_URL, None)) elif target_url.startswith(OPENROUTER_BASE): fallback_order.append((FALLBACK_SLM_URL, None)) for fb_base, fb_key in fallback_order: # Check health before routing async with httpx.AsyncClient(timeout=10.0) as client: try: resp = await client.get(f"{fb_base}/v1/models") if resp.status_code != 200: continue fb_url = f"{fb_base}/{path}" if fb_key: headers["Authorization"] = f"Bearer {fb_key}" result = await execute(fb_url) if result is not None: return result except Exception: continue return Response( content="No valid target available (all backends down)", status_code=503, )