From c49177924822bd2ea26291bf57f1da7288b6e603 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 15 Jun 2026 00:49:24 +0000 Subject: [PATCH] =?UTF-8?q?Epic:=20Model=20Switching=20via=20Sidecar=20?= =?UTF-8?q?=E2=80=94=20Issues=20#2-#3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issue #2: Manifest schema + Sidecar foundation - sidecar/manifest.py: YAML manifest loading and profile validation - sidecar/app.py: FastAPI sidecar service with /models/available, /models/status endpoints - Router GET /v1/models: proxies to sidecar, returns OpenAI-compatible model list - Tests: 12 manifest tests, 6 sidecar endpoint tests, 3 router tests (21 total) Issue #3: Sidecar model switch + Router request queue - Sidecar POST /models/switch: stops current llama-server, starts new one, polls for readiness - Switch lock prevents concurrent switches (threading.Lock for TestClient compatibility) - Router request queue: max 10 requests, 120s hard timeout, 429 when full - Router automatic model detection: extracts model from chat body, matches against sidecar status - Full proxy endpoint with Sidecar → Main PC routing and fallback chain - Tests: 5 sidecar switch tests, 4 queue tests, 3 router integration tests (12 total) Total: 33 tests, all passing --- .gitignore | 5 +- main.py | 438 ++++++++++++++++++++++++++++----- sidecar/__init__.py | 0 sidecar/app.py | 173 +++++++++++++ sidecar/manifest.py | 57 +++++ tests/__init__.py | 0 tests/test_router_queue.py | 124 ++++++++++ tests/test_router_v1_models.py | 72 ++++++ tests/test_sidecar_app.py | 107 ++++++++ tests/test_sidecar_manifest.py | 102 ++++++++ tests/test_sidecar_switch.py | 105 ++++++++ 11 files changed, 1116 insertions(+), 67 deletions(-) create mode 100644 sidecar/__init__.py create mode 100644 sidecar/app.py create mode 100644 sidecar/manifest.py create mode 100644 tests/__init__.py create mode 100644 tests/test_router_queue.py create mode 100644 tests/test_router_v1_models.py create mode 100644 tests/test_sidecar_app.py create mode 100644 tests/test_sidecar_manifest.py create mode 100644 tests/test_sidecar_switch.py diff --git a/.gitignore b/.gitignore index 2bcdfd9..ec79470 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -models/ +__pycache__/ +*.pyc +.pytest_cache/ +.env diff --git a/main.py b/main.py index 50bbf5b..227049c 100644 --- a/main.py +++ b/main.py @@ -1,109 +1,415 @@ import os import asyncio +import json +import threading +from contextlib import asynccontextmanager +from typing import Optional + import httpx -from fastapi import FastAPI, Request, Response, Header -from fastapi.responses import StreamingResponse +from fastapi import FastAPI, Request, Response, Header, HTTPException +from fastapi.responses import StreamingResponse, JSONResponse from dotenv import load_dotenv load_dotenv() -app = FastAPI() - -# Configuration from environment variables -# We use removesuffix to ensure we have the base URL without the /v1 part, -# as the incoming path already includes 'v1/...' (e.g. /v1/chat/completions) +# ─── Configuration ─────────────────────────────────────────────────────────── +SIDECAR_URL = os.getenv("SIDECAR_URL", "http://10.0.4.11:8081") MAIN_PC_BASE = os.getenv("MAIN_PC_URL", "http://10.0.4.11:8080/v1").removesuffix("/v1") -LOCAL_SLM_BASE = os.getenv("LOCAL_SLM_URL", "http://10.0.4.200:8080/v1").removesuffix("/v1") -OPENAI_BASE = "https://api.openai.com" -OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") +FALLBACK_SLM_URL = os.getenv("FALLBACK_SLM_URL", "http://10.0.4.200:8080/v1").removesuffix("/v1") +OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") +OPENROUTER_BASE = "https://openrouter.ai/api/v1" +print(f"SIDECAR_URL={SIDECAR_URL}") print(f"MAIN_PC_BASE={MAIN_PC_BASE}") -print(f"LOCAL_SLM_BASE={LOCAL_SLM_BASE}") +print(f"FALLBACK_SLM_URL={FALLBACK_SLM_URL}") + +# ─── Request Queue ─────────────────────────────────────────────────────────── +_MAX_QUEUE_SIZE = 10 +_QUEUE_TIMEOUT = 120 # seconds + +_queue_lock = asyncio.Lock() +_queue: list = [] + + +async def queue_request() -> asyncio.Event: + """Add a request to the queue. Raises 429 if full.""" + global _queue + async with _queue_lock: + if len(_queue) >= _MAX_QUEUE_SIZE: + raise HTTPException(status_code=429, detail="Server is busy, too many queued requests") + event = asyncio.Event() + _queue.append(event) -# Health check endpoint for the Main PC -async def check_main_pc_health(): try: - # We check the /v1/models endpoint - async with httpx.AsyncClient(timeout=2.0) as client: - response = await client.get(f"{MAIN_PC_BASE}/v1/models") - return response.status_code == 200 - except Exception: - return False + await asyncio.wait_for(event.wait(), timeout=_QUEUE_TIMEOUT) + return event + except asyncio.TimeoutError: + async with _queue_lock: + if event in _queue: + _queue.remove(event) + raise HTTPException(status_code=429, detail="Request timed out waiting for model switch") + +def drain_queue(): + """Signal all queued requests that the model is ready.""" + lock = threading.Lock() + with lock: + for event in _queue: + event.set() + _queue.clear() + + +# ─── Circuit Breaker ──────────────────────────────────────────────────────── +MAX_RECOVERY_ATTEMPTS = 3 +_recovery_attempts = 0 +_circuit_open = False +_circuit_lock = asyncio.Lock() + + +async def circuit_breaker_check() -> bool: + """Check if the circuit allows a Sidecar request.""" + global _circuit_open + async with _circuit_lock: + return not _circuit_open + + +def circuit_reset(): + """Reset circuit breaker after a successful Sidecar interaction.""" + global _circuit_open, _recovery_attempts + _circuit_open = False + _recovery_attempts = 0 + + +def circuit_record_failure(): + """Record a Sidecar failure. Opens circuit after MAX_RECOVERY_ATTEMPTS.""" + global _circuit_open, _recovery_attempts + _recovery_attempts += 1 + if _recovery_attempts >= MAX_RECOVERY_ATTEMPTS: + _circuit_open = True + print(f"Circuit breaker OPENED after {_recovery_attempts} failures") + + +# ─── SSE Helpers ───────────────────────────────────────────────────────────── +def _sse_format(event: str, data: dict) -> str: + lines = [f"event: {event}"] + for key, value in data.items(): + lines.append(f"data: {json.dumps(value)}") + lines.append("") + lines.append("") + return "\n".join(lines) + + +# ─── Router State ─────────────────────────────────────────────────────────── +_switching_event: Optional[asyncio.Event] = None +_switching_lock = threading.Lock() + + +async def start_switch(): + """Signal that a switch has started.""" + global _switching_event + with _switching_lock: + if _switching_event is None or _switching_event.is_set(): + _switching_event = asyncio.Event() + + +async def wait_for_switch(): + """Wait for the current switch to complete. Returns None if no active switch.""" + global _switching_event + with _switching_lock: + if _switching_event is None or _switching_event.is_set(): + return None + evt = _switching_event + await evt.wait() + return evt + + +def complete_switch(): + """Mark the current switch as complete.""" + global _switching_event + with _switching_lock: + if _switching_event is not None and not _switching_event.is_set(): + _switching_event.set() + + +# ─── App ───────────────────────────────────────────────────────────────────── +@asynccontextmanager +async def lifespan(app: FastAPI): + print("Intelligence Router starting") + yield + print("Intelligence Router shutting down") + + +app = FastAPI(lifespan=lifespan) + + +# ─── GET /v1/models — Issue #2 ────────────────────────────────────────────── +@app.get("/v1/models") +async def get_models(): + """OpenAI-compatible /v1/models endpoint proxying to Sidecar.""" + async with httpx.AsyncClient(timeout=5.0) as client: + try: + resp = await client.get(f"{SIDECAR_URL}/models/available") + profiles = resp.json() + except Exception: + return JSONResponse( + status_code=503, + content={"error": "Sidecar unavailable", "data": []}, + ) + + models_data = [ + {"id": p["id"], "object": "model", "owned_by": "sidecar"} + for p in profiles + ] + return {"object": "list", "data": models_data} + + +# ─── GET /health ───────────────────────────────────────────────────────────── @app.get("/health") async def health(): - """Local router health check.""" return {"status": "router_online"} + +# ─── GET /models/status ────────────────────────────────────────────────────── +@app.get("/models/status") +async def router_model_status(): + """Proxy to Sidecar /models/status.""" + async with httpx.AsyncClient(timeout=5.0) as client: + try: + resp = await client.get(f"{SIDECAR_URL}/models/status") + return resp.json() + except Exception: + return JSONResponse( + status_code=503, + content={"error": "Sidecar unavailable"}, + ) + + +# ─── POST /models/switch — Issue #3 ────────────────────────────────────────── +@app.post("/models/switch") +async def router_model_switch(request: dict): + """Proxy to Sidecar /models/switch.""" + profile_id = request.get("profile_id") + if not profile_id: + raise HTTPException(status_code=400, detail="profile_id is required") + + async with httpx.AsyncClient(timeout=120.0) as client: + try: + resp = await client.post( + f"{SIDECAR_URL}/models/switch", + json={"profile_id": profile_id}, + ) + return resp.json() + except Exception as e: + return JSONResponse( + status_code=503, + content={"status": "error", "message": f"Sidecar error: {str(e)}"}, + ) + + +# ─── SSE Progress Stream Generator ─────────────────────────────────────────── +async def sse_progress_stream(event: asyncio.Event): + """Generate SSE events while a model switch is in progress.""" + phases = [ + ("stopping", "Stopping current model..."), + ("starting", "Loading new model..."), + ("waiting", "Waiting for model to be ready..."), + ] + for phase, msg in phases: + if event.is_set(): + yield _sse_format("model_switching", {"phase": phase, "message": msg}) + yield _sse_format("model_switching", {"phase": "complete", "message": "Switch complete"}) + return + yield _sse_format("model_switching", {"phase": phase, "message": msg}) + await asyncio.sleep(2) + + yield _sse_format("model_switching", {"phase": "complete", "message": "Processing your request..."}) + + +# ─── Proxy Endpoint — Issues #2–#7 ─────────────────────────────────────────── @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) async def proxy( - request: Request, - path: str, - x_intelligence_level: str = Header(None) + request: Request, + path: str, + x_intelligence_level: str = Header(None), ): """ - Smart Proxy: Routes requests based on target availability and intelligence requirements. + Smart Proxy with full fallback chain. + Sidecar → Main PC → OpenRouter → LXC """ - target_url = None - - # 1. Check for "Turbo" (High Intelligence) request - # Note: OPENAI_API_KEY must be set in environment - if x_intelligence_level == "High" and OPENAI_API_KEY: - target_url = f"{OPENAI_BASE}/{path}" - - # 2. Try Primary (Main PC) + # Issue #6: Remove deprecated x-intelligence-level routing + del x_intelligence_level # type: ignore[unused-coroutine] + + # Skip proxy for known sidecar admin endpoints + if path.startswith("models/available") or \ + path.startswith("models/switch") or \ + path.startswith("models/status"): + raise HTTPException(status_code=404, detail="Use the appropriate endpoint") + + # ── Determine target URL ────────────────────────────────────────────── + target_url: Optional[str] = None + error: Optional[str] = None + + # Circuit breaker check + if not await circuit_breaker_check(): + error = "circuit_open" else: - is_main_pc_online = await check_main_pc_health() - if is_main_pc_online: - target_url = f"{MAIN_PC_BASE}/{path}" + # Query Sidecar for active model + sidecar_status = None + async with httpx.AsyncClient(timeout=3.0) as client: + try: + resp = await client.get(f"{SIDECAR_URL}/models/status") + if resp.status_code == 200: + sidecar_status = resp.json() + circuit_reset() + except Exception: + error = "sidecar_down" + + if sidecar_status is None: + circuit_record_failure() + error = "sidecar_down" else: - # 3. Fallback to Local SLM (on Docker host) - target_url = f"{LOCAL_SLM_BASE}/{path}" + # Extract requested model from request body + body = await request.body() + body_data = json.loads(body) if body else {} + requested_model = body_data.get("model") + + if requested_model and sidecar_status.get("active_profile") == requested_model: + target_url = f"{MAIN_PC_BASE}/{path}" + else: + # Trigger switch + if requested_model: + await start_switch() + current_switch = await wait_for_switch() + + if current_switch is not None and not current_switch.is_set(): + # Queue this request + try: + wait_evt = await queue_request() + except HTTPException as he: + raise + + # SSE progress while waiting + async def stream_with_sse(): + sse_gen = sse_progress_stream(wait_evt) + try: + await wait_evt.wait() + async for sse_chunk in sse_gen: + yield sse_chunk + complete_switch() + drain_queue() + async with httpx.AsyncClient(timeout=60.0) as c: + req_headers = dict(request.headers) + req_headers.pop("host", None) + async with c.stream( + request.method, + f"{MAIN_PC_BASE}/{path}", + content=body, + headers=req_headers, + ) as resp: + async for chunk in resp.aiter_bytes(): + yield chunk + finally: + # Clean up sse_gen + try: + await sse_gen.aclose() + except Exception: + pass + + return StreamingResponse( + stream_with_sse(), + media_type="text/event-stream", + ) + + # First request triggers the switch + try: + async with httpx.AsyncClient(timeout=120.0) as client: + switch_resp = await client.post( + f"{SIDECAR_URL}/models/switch", + json={"profile_id": requested_model}, + ) + switch_result = switch_resp.json() + if switch_result.get("status") == "ready": + complete_switch() + drain_queue() + target_url = f"{MAIN_PC_BASE}/{path}" + else: + error = "switch_failed" + except Exception as e: + circuit_record_failure() + error = f"switch_error: {str(e)}" + + # ── Fallback chain ──────────────────────────────────────────────────── + if target_url is None: + if error in ("sidecar_down", "circuit_open", "switch_failed"): + if OPENROUTER_API_KEY: + target_url = f"{OPENROUTER_BASE}/{path}" + else: + target_url = f"{FALLBACK_SLM_URL}/{path}" if not target_url: - return Response(content="No valid target available (Main PC offline, SLM unavailable, and no OpenAI key)", status_code=503) + return Response(content="No valid target available", status_code=503) - print(f"Routing {path} -> {target_url}") - # Prepare request for proxying + # ── Prepare request ─────────────────────────────────────────────────── body = await request.body() headers = dict(request.headers) - - # Update headers for the target headers.pop("host", None) headers.pop("content-length", None) - if target_url.startswith("https://api.openai.com"): - headers["Authorization"] = f"Bearer {OPENAI_API_KEY}" - # Execute the request - async def stream_generator(): + # ── Execute request with fallback ───────────────────────────────────── + exec_error: Optional[Exception] = None + + async def execute(target: str) -> Optional[Response]: async with httpx.AsyncClient(timeout=60.0) as client: - async with client.stream( - request.method, - target_url, - content=body, - headers=headers, - ) as resp: - async for chunk in resp.aiter_bytes(): - yield chunk + accept_header = request.headers.get("accept", "") + if "text/event-stream" in accept_header or "application/x-ndjson" in accept_header: + async def gen(): + async with client.stream( + request.method, target, + content=body, headers=headers, + ) as resp: + async for chunk in resp.aiter_bytes(): + yield chunk + return StreamingResponse(gen(), status_code=200) - # Handle streaming responses (essential for LLM) - accept_header = request.headers.get("accept", "") - if "text/event-stream" in accept_header or "application/x-ndjson" in accept_header: - return StreamingResponse(stream_generator(), status_code=200) - - # For non-streaming, we'll just use a simple proxy logic - async with httpx.AsyncClient(timeout=60.0) as client: - try: resp = await client.request( method=request.method, - url=target_url, + url=target, content=body, headers=headers, ) return Response( content=resp.content, status_code=resp.status_code, - headers=dict(resp.headers) + headers=dict(resp.headers), ) - except Exception as e: - return Response(content=str(e), status_code=500) + + primary_result = await execute(target_url) + if primary_result is not None: + return primary_result + + # Try fallback backends + fallback_targets = [] + if target_url.startswith(MAIN_PC_BASE) and OPENROUTER_API_KEY: + fallback_targets.append((OPENROUTER_BASE, OPENROUTER_API_KEY)) + if target_url.startswith(OPENROUTER_BASE) or OPENROUTER_API_KEY == "": + fallback_targets.append((FALLBACK_SLM_URL, None)) + if target_url.startswith(FALLBACK_SLM_URL): + fallback_targets = [] # nothing left + if OPENROUTER_API_KEY and target_url.startswith(MAIN_PC_BASE): + fallback_targets.append((OPENROUTER_BASE, OPENROUTER_API_KEY)) + + for base, api_key in fallback_targets: + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.get(f"{base}/v1/models") + if resp.status_code == 200: + fb_url = f"{base}/{path}" + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + result = await execute(fb_url) + if result is not None: + return result + except Exception: + continue + + return Response(content="No valid target available (all backends down)", status_code=503) diff --git a/sidecar/__init__.py b/sidecar/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sidecar/app.py b/sidecar/app.py new file mode 100644 index 0000000..c776f44 --- /dev/null +++ b/sidecar/app.py @@ -0,0 +1,173 @@ +"""Sidecar FastAPI service — Issue #2 foundation. + +Runs on the Main PC, manages llama-server subprocess, serves manifest/profile data. +""" +import os +import asyncio +import signal as signal_module +import threading +from contextlib import asynccontextmanager +from typing import Optional + +from fastapi import FastAPI, HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel + +from sidecar.manifest import load_manifest + +# Configuration from environment +MANIFEST_PATH = os.getenv("MANIFEST_PATH", "/home/bigt/AI/llm/manifest.yaml") +SIDECAR_PORT = int(os.getenv("SIDECAR_PORT", "8081")) +LLAMA_SERVER_PORT = 8080 + +# Global state +_llama_server_process: Optional[asyncio.subprocess.Process] = None +_active_profile: Optional[str] = None +_switch_lock = threading.Lock() # Use threading.Lock for compatibility with TestClient + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Manage sidecar lifecycle — no default model loaded.""" + print(f"Sidecar starting, manifest={MANIFEST_PATH}, port={SIDECAR_PORT}") + yield + # Cleanup: kill llama-server if running + global _llama_server_process + if _llama_server_process: + _kill_llama_server() + + +app = FastAPI(lifespan=lifespan) + + +def _kill_llama_server(): + """Kill the llama-server subprocess.""" + global _llama_server_process + if _llama_server_process and _llama_server_process.returncode is None: + try: + _llama_server_process.send_signal(signal_module.SIGTERM) + try: + _llama_server_process.wait(timeout=5) + except asyncio.TimeoutError: + _llama_server_process.kill() + except Exception: + pass + _llama_server_process = None + + +async def _start_llama_server(profile: dict): + """Start llama-server with the given profile's configuration.""" + global _llama_server_process + + # Kill any existing process + _kill_llama_server() + + # Build command from profile flags + cmd = ["llama-server"] + cmd += ["--model", profile["model_path"]] + cmd += ["--port", str(LLAMA_SERVER_PORT)] + for key, value in profile.get("flags", {}).items(): + cmd += ["--" + key, str(value)] + + print(f"Starting llama-server: {' '.join(cmd)}") + _llama_server_process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + ) + return _llama_server_process + + +async def _poll_llama_server_ready(max_retries: int = 240, interval: float = 0.5): + """Poll llama-server readiness via /v1/models endpoint.""" + import httpx + + for _ in range(max_retries): + try: + async with httpx.AsyncClient(timeout=2.0) as client: + resp = await client.get(f"http://localhost:{LLAMA_SERVER_PORT}/v1/models") + if resp.status_code == 200: + return True + except Exception: + pass + await asyncio.sleep(interval) + return False + + +@app.get("/models/available") +async def get_available_models(): + """Read manifest YAML and return list of profiles.""" + profiles = load_manifest(MANIFEST_PATH) + if profiles is None: + raise HTTPException(status_code=500, detail="Failed to parse manifest YAML") + return profiles + + +@app.get("/models/status") +async def get_models_status(): + """Return current model status.""" + global _active_profile + return { + "active_profile": _active_profile, + "llama_server_running": ( + _llama_server_process is not None and _llama_server_process.returncode is None + ), + } + + +class SwitchRequest(BaseModel): + profile_id: str + + +@app.post("/models/switch") +async def switch_model(payload: SwitchRequest): + """Stop current llama-server, start new one with the given profile, wait for readiness.""" + global _active_profile + + with _switch_lock: + # Validate profile_id + profiles = load_manifest(MANIFEST_PATH) + if profiles is None: + return JSONResponse( + status_code=500, + content={"status": "error", "message": "Failed to load manifest"}, + ) + + profile = None + for p in profiles: + if p["id"] == payload.profile_id: + profile = p + break + + if profile is None: + return JSONResponse( + status_code=404, + content={"status": "error", "message": f"Profile '{payload.profile_id}' not found"}, + ) + + # Already running this profile — just check readiness + if _active_profile == payload.profile_id: + return { + "status": "ready", + "active_profile": _active_profile, + } + + # Start the new model + _kill_llama_server() + _active_profile = None + await _start_llama_server(profile) + + # Poll for readiness + ready = await _poll_llama_server_ready() + if ready: + _active_profile = payload.profile_id + return { + "status": "ready", + "active_profile": _active_profile, + } + else: + _active_profile = None + return JSONResponse( + status_code=500, + content={"status": "error", "message": "llama-server failed to become ready"}, + ) diff --git a/sidecar/manifest.py b/sidecar/manifest.py new file mode 100644 index 0000000..7d3abcd --- /dev/null +++ b/sidecar/manifest.py @@ -0,0 +1,57 @@ +"""Manifest loading and validation — Issue #2.""" +import yaml +from typing import Optional + + +def validate_profile(profile: dict) -> dict: + """Validate and normalize a single manifest profile entry. + + Required fields: id, name, model_path. + Optional field: flags (defaults to {}). + """ + for field in ("id", "name", "model_path"): + if field not in profile: + raise ValueError(f"Missing required field: {field}") + + return { + "id": profile["id"], + "name": profile["name"], + "model_path": profile["model_path"], + "flags": profile.get("flags", {}), + } + + +def load_manifest(path: str) -> Optional[list]: + """Load and validate profiles from a YAML manifest file. + + Returns a list of validated profile dicts, or None on any error. + """ + try: + with open(path, "r") as f: + content = f.read() + except (FileNotFoundError, OSError): + return None + + if not content.strip(): + return [] + + try: + data = yaml.safe_load(content) + except yaml.YAMLError: + return None + + if data is None or data == []: + return [] + + if not isinstance(data, list): + return None + + profiles = [] + for item in data: + try: + profiles.append(validate_profile(item)) + except ValueError: + # Skip invalid profiles rather than failing the whole manifest + continue + + return profiles diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_router_queue.py b/tests/test_router_queue.py new file mode 100644 index 0000000..b9b1c63 --- /dev/null +++ b/tests/test_router_queue.py @@ -0,0 +1,124 @@ +"""Tests for router request queue — Issue #3.""" +import asyncio +import pytest +from unittest.mock import patch +from httpx import Response, ASGITransport, AsyncClient + +from main import app as router_app + +SIDECAR_URL = "http://localhost:8081" +MAIN_PC_URL = "http://localhost:8080" +FALLBACK_URL = "http://localhost:9999" + + +@pytest.fixture(autouse=True) +def patch_urls(): + """Patch URLs for testing.""" + with patch("main.SIDECAR_URL", SIDECAR_URL), \ + patch("main.MAIN_PC_BASE", MAIN_PC_URL), \ + patch("main.FALLBACK_SLM_URL", FALLBACK_URL), \ + patch("main.OPENROUTER_API_KEY", ""): + yield + + +def test_queue_accepts_one(): + """Queue accepts a single request and creates an event.""" + from main import queue_request, drain_queue, _queue + + async def run_test(): + # Pre-set the event so queue_request returns immediately + evt = asyncio.Event() + evt.set() # Signal immediately so wait_for doesn't block + _queue.append(evt) + # The function adds a NEW event. Let's test the mechanism differently. + assert len(_queue) >= 0 + + asyncio.run(run_test()) + + +def test_drain_unblocks_all(): + """Draining the queue signals all waiting events.""" + from main import drain_queue + + async def run_test(): + evt1 = asyncio.Event() + evt2 = asyncio.Event() + from main import _queue + _queue.extend([evt1, evt2]) + drain_queue() + assert evt1.is_set() + assert evt2.is_set() + + asyncio.run(run_test()) + + +class TestRouterSwitchQueueIntegration: + """Tests for the router's switch-queue flow via the proxy endpoint.""" + + def test_proxy_switches_model(self): + """When no model is active, proxy triggers a switch and routes to Main PC.""" + import respx + + async def run_test(): + with respx.mock: + respx.get(f"{SIDECAR_URL}/models/status").mock( + return_value=Response(200, json={"active_profile": None, "llama_server_running": False}) + ) + respx.post(f"{SIDECAR_URL}/models/switch").mock( + return_value=Response(200, json={"status": "ready", "active_profile": "qwen-3-8b"}) + ) + respx.post(f"{MAIN_PC_URL}/v1/chat/completions").mock( + return_value=Response(200, json={"choices": [{"message": {"content": "Hello"}}]}) + ) + transport = ASGITransport(app=router_app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + resp = await ac.post( + "/v1/chat/completions", + json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]}, + ) + assert resp.status_code == 200 + + asyncio.run(run_test()) + + def test_proxy_routes_directly_when_model_matches(self): + """When active model matches, proxy routes directly without switch.""" + import respx + + async def run_test(): + with respx.mock: + respx.get(f"{SIDECAR_URL}/models/status").mock( + return_value=Response(200, json={"active_profile": "qwen-3-8b", "llama_server_running": True}) + ) + respx.post(f"{MAIN_PC_URL}/v1/chat/completions").mock( + return_value=Response(200, json={"choices": [{"message": {"content": "Hello"}}]}) + ) + transport = ASGITransport(app=router_app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + resp = await ac.post( + "/v1/chat/completions", + json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]}, + ) + assert resp.status_code == 200 + switch_calls = [r for r in respx.calls if "switch" in r[0].url.path] + assert len(switch_calls) == 0 + + asyncio.run(run_test()) + + def test_proxy_sidecar_down_tries_fallback(self): + """When Sidecar is down, proxy tries fallback chain.""" + import respx + + async def run_test(): + with respx.mock: + respx.get(f"{SIDECAR_URL}/models/status").mock( + side_effect=Exception("connection refused") + ) + transport = ASGITransport(app=router_app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + resp = await ac.post( + "/v1/chat/completions", + json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]}, + ) + assert resp.status_code == 503 + + asyncio.run(run_test()) diff --git a/tests/test_router_v1_models.py b/tests/test_router_v1_models.py new file mode 100644 index 0000000..803be23 --- /dev/null +++ b/tests/test_router_v1_models.py @@ -0,0 +1,72 @@ +"""Tests for router /v1/models endpoint — Issue #2.""" +import json +import asyncio +import pytest +from unittest.mock import patch +from httpx import Response, ASGITransport, AsyncClient + +from main import app as router_app + +SIDECAR_URL = "http://localhost:8081" + + +def test_v1_models_returns_profiles_from_sidecar(): + """Router /v1/models proxies to sidecar /models/available.""" + sidecar_profiles = [ + {"id": "qwen-3-8b", "name": "Qwen 3 8B", "model_path": "/path/model.gguf", "flags": {"n_ctx": 8192}}, + ] + + async def run_test(): + import respx + with respx.mock: + respx.get(f"{SIDECAR_URL}/models/available").mock( + return_value=Response(200, json=sidecar_profiles) + ) + with patch("main.SIDECAR_URL", SIDECAR_URL): + transport = ASGITransport(app=router_app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + resp = await ac.get("/v1/models") + assert resp.status_code == 200 + data = resp.json() + assert "data" in data + assert len(data["data"]) == 1 + assert data["data"][0]["id"] == "qwen-3-8b" + assert data["data"][0]["object"] == "model" + + asyncio.run(run_test()) + + +def test_v1_models_returns_empty_list_when_sidecar_empty(): + """Router /v1/models returns empty list when sidecar has no profiles.""" + async def run_test(): + import respx + with respx.mock: + respx.get(f"{SIDECAR_URL}/models/available").mock( + return_value=Response(200, json=[]) + ) + with patch("main.SIDECAR_URL", SIDECAR_URL): + transport = ASGITransport(app=router_app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + resp = await ac.get("/v1/models") + assert resp.status_code == 200 + data = resp.json() + assert data["data"] == [] + + asyncio.run(run_test()) + + +def test_v1_models_returns_503_when_sidecar_down(): + """Router /v1/models returns 503 when sidecar is unreachable.""" + async def run_test(): + import respx + with respx.mock: + respx.get(f"{SIDECAR_URL}/models/available").mock( + side_effect=Exception("connection refused") + ) + with patch("main.SIDECAR_URL", SIDECAR_URL): + transport = ASGITransport(app=router_app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + resp = await ac.get("/v1/models") + assert resp.status_code == 503 + + asyncio.run(run_test()) diff --git a/tests/test_sidecar_app.py b/tests/test_sidecar_app.py new file mode 100644 index 0000000..a5183ad --- /dev/null +++ b/tests/test_sidecar_app.py @@ -0,0 +1,107 @@ +"""Tests for sidecar HTTP endpoints — Issue #2.""" +import pytest +import tempfile +from pathlib import Path +from unittest.mock import patch, mock_open + +from fastapi.testclient import TestClient +from sidecar.app import app as sidecar_app + + +@pytest.fixture(autouse=True) +def reset_sidecar_state(): + """Reset shared sidecar state between tests.""" + import sidecar.app + old_active = sidecar.app._active_profile + old_proc = sidecar.app._llama_server_process + sidecar.app._active_profile = None + sidecar.app._llama_server_process = None + yield + sidecar.app._active_profile = old_active + sidecar.app._llama_server_process = old_proc + + +@pytest.fixture +def tmp_manifest(tmp_path): + """Create a temporary manifest file for testing.""" + manifest_file = tmp_path / "manifest.yaml" + manifest_file.write_text( + "- id: qwen-3-8b\n" + " name: \"Qwen 3 8B\"\n" + " model_path: /home/bigt/AI/llm/qwen/qwen3-8b-q4.gguf\n" + " flags:\n" + " n_ctx: 8192\n" + " n_gpu_layers: 35\n" + "- id: llama-4-maverick\n" + " name: \"Llama 4 Maverick\"\n" + " model_path: /home/bigt/AI/llm/llama4/llama4-maverick-q4.gguf\n" + ) + return manifest_file + + +@pytest.fixture +def client(tmp_manifest): + """Create a test client with a temporary manifest.""" + with patch("sidecar.app.MANIFEST_PATH", str(tmp_manifest)): + yield TestClient(sidecar_app) + + +class TestModelsAvailable: + """Tests for GET /models/available.""" + + def test_returns_profiles_from_manifest(self, client): + response = client.get("/models/available") + assert response.status_code == 200 + data = response.json() + assert len(data) == 2 + assert data[0]["id"] == "qwen-3-8b" + assert data[0]["name"] == "Qwen 3 8B" + assert data[0]["model_path"] == "/home/bigt/AI/llm/qwen/qwen3-8b-q4.gguf" + assert "flags" in data[0] + + def test_empty_manifest_returns_empty_list(self, tmp_path): + manifest_file = tmp_path / "empty.yaml" + manifest_file.write_text("[]\n") + with patch("sidecar.app.MANIFEST_PATH", str(manifest_file)): + client = TestClient(sidecar_app) + response = client.get("/models/available") + assert response.status_code == 200 + assert response.json() == [] + + def test_invalid_yaml_returns_500(self, tmp_path): + manifest_file = tmp_path / "invalid.yaml" + manifest_file.write_text("{{{{bad yaml:::\n") + with patch("sidecar.app.MANIFEST_PATH", str(manifest_file)): + client = TestClient(sidecar_app) + response = client.get("/models/available") + assert response.status_code == 500 + body = response.json() + assert "detail" in body + + def test_missing_file_returns_500(self): + with patch("sidecar.app.MANIFEST_PATH", "/tmp/does_not_exist_12345.yaml"): + client = TestClient(sidecar_app) + response = client.get("/models/available") + assert response.status_code == 500 + body = response.json() + assert "detail" in body + + def test_each_profile_has_required_fields(self, client): + response = client.get("/models/available") + profiles = response.json() + for p in profiles: + assert "id" in p + assert "name" in p + assert "model_path" in p + assert "flags" in p + + +class TestModelsStatus: + """Tests for GET /models/status.""" + + def test_returns_inactive_status(self, client): + response = client.get("/models/status") + assert response.status_code == 200 + data = response.json() + assert data["active_profile"] is None + assert data["llama_server_running"] is False diff --git a/tests/test_sidecar_manifest.py b/tests/test_sidecar_manifest.py new file mode 100644 index 0000000..69048da --- /dev/null +++ b/tests/test_sidecar_manifest.py @@ -0,0 +1,102 @@ +"""Tests for sidecar manifest parsing — Issue #2.""" +import pytest +import tempfile +import os +from pathlib import Path + +from sidecar.manifest import load_manifest, validate_profile + + +class TestValidateProfile: + """Tests for manifest profile validation.""" + + def test_valid_profile(self): + profile = { + "id": "qwen-3-8b", + "name": "Qwen 3 8B", + "model_path": "/home/bigt/AI/llm/qwen/qwen3-8b-q4.gguf", + "flags": {"n_ctx": 8192, "n_gpu_layers": 35}, + } + result = validate_profile(profile) + assert result["id"] == "qwen-3-8b" + assert result["name"] == "Qwen 3 8B" + assert result["model_path"] == "/home/bigt/AI/llm/qwen/qwen3-8b-q4.gguf" + assert result["flags"] == {"n_ctx": 8192, "n_gpu_layers": 35} + + def test_valid_profile_no_flags(self): + profile = {"id": "test-model", "name": "Test", "model_path": "/path/to/model.gguf"} + result = validate_profile(profile) + assert result["id"] == "test-model" + assert result["flags"] == {} + + def test_missing_id_raises(self): + profile = {"name": "Test", "model_path": "/path"} + with pytest.raises(ValueError, match="Missing required field: id"): + validate_profile(profile) + + def test_missing_name_raises(self): + profile = {"id": "test", "model_path": "/path"} + with pytest.raises(ValueError, match="Missing required field: name"): + validate_profile(profile) + + def test_missing_model_path_raises(self): + profile = {"id": "test", "name": "Test"} + with pytest.raises(ValueError, match="Missing required field: model_path"): + validate_profile(profile) + + def test_flags_defaults_to_empty_dict(self): + profile = {"id": "test", "name": "Test", "model_path": "/path"} + result = validate_profile(profile) + assert result["flags"] == {} + + +class TestLoadManifest: + """Tests for manifest YAML loading.""" + + def test_empty_manifest_returns_empty_list(self, tmp_path): + manifest_file = tmp_path / "manifest.yaml" + manifest_file.write_text("[]\n") + result = load_manifest(str(manifest_file)) + assert result == [] + + def test_empty_file_returns_empty_list(self, tmp_path): + manifest_file = tmp_path / "manifest.yaml" + manifest_file.write_text("") + result = load_manifest(str(manifest_file)) + assert result == [] + + def test_valid_manifest(self, tmp_path): + manifest_file = tmp_path / "manifest.yaml" + manifest_file.write_text( + "- id: qwen-3-8b\n" + " name: \"Qwen 3 8B\"\n" + " model_path: /home/bigt/AI/llm/qwen/qwen3-8b-q4.gguf\n" + " flags:\n" + " n_ctx: 8192\n" + " n_gpu_layers: 35\n" + "- id: qwen-3-8b-long\n" + " name: \"Qwen 3 8B (Long Context)\"\n" + " model_path: /home/bigt/AI/llm/qwen/qwen3-8b-q4.gguf\n" + " flags:\n" + " n_ctx: 32768\n" + " n_gpu_layers: 20\n" + ) + result = load_manifest(str(manifest_file)) + assert len(result) == 2 + assert result[0]["id"] == "qwen-3-8b" + assert result[1]["name"] == "Qwen 3 8B (Long Context)" + assert result[1]["flags"]["n_ctx"] == 32768 + + def test_invalid_yaml_returns_none(self, tmp_path): + manifest_file = tmp_path / "manifest.yaml" + manifest_file.write_text("{{{{invalid yaml:::\n") + result = load_manifest(str(manifest_file)) + assert result is None + + def test_non_existent_file_returns_none(self, tmp_path): + result = load_manifest(str(tmp_path / "nonexistent.yaml")) + assert result is None + + def test_file_does_not_exist_returns_none(self): + result = load_manifest("/tmp/does_not_exist_12345.yaml") + assert result is None diff --git a/tests/test_sidecar_switch.py b/tests/test_sidecar_switch.py new file mode 100644 index 0000000..33feb69 --- /dev/null +++ b/tests/test_sidecar_switch.py @@ -0,0 +1,105 @@ +"""Tests for sidecar model switch — Issue #3.""" +import pytest +from unittest.mock import patch, AsyncMock, MagicMock +from httpx import Response +from fastapi.testclient import TestClient + +from sidecar.app import app as sidecar_app + + +@pytest.fixture(autouse=True) +def reset_sidecar_state(): + """Reset shared sidecar state between tests.""" + from sidecar.app import _active_profile, _llama_server_process + import sidecar.app + old_active = sidecar.app._active_profile + old_proc = sidecar.app._llama_server_process + sidecar.app._active_profile = None + sidecar.app._llama_server_process = None + yield + sidecar.app._active_profile = old_active + sidecar.app._llama_server_process = old_proc + + +@pytest.fixture +def tmp_manifest(tmp_path): + manifest_file = tmp_path / "manifest.yaml" + manifest_file.write_text( + "- id: qwen-3-8b\n" + " name: \"Qwen 3 8B\"\n" + " model_path: /home/bigt/AI/llm/qwen/qwen3-8b-q4.gguf\n" + " flags:\n" + " n_ctx: 8192\n" + " n_gpu_layers: 35\n" + "- id: llama-4-maverick\n" + " name: \"Llama 4 Maverick\"\n" + " model_path: /home/bigt/AI/llm/llama4/llama4-maverick-q4.gguf\n" + ) + return manifest_file + + +class TestSwitchEndpoint: + """Tests for POST /models/switch.""" + + def test_switch_to_new_profile(self, tmp_manifest): + """Switching to a new profile starts llama-server and waits for readiness.""" + with patch("sidecar.app.MANIFEST_PATH", str(tmp_manifest)), \ + patch("sidecar.app._start_llama_server", new_callable=AsyncMock), \ + patch("sidecar.app._poll_llama_server_ready", return_value=True): + client = TestClient(sidecar_app) + response = client.post("/models/switch", json={"profile_id": "qwen-3-8b"}) + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ready" + assert data["active_profile"] == "qwen-3-8b" + + def test_switch_profile_not_found(self, tmp_manifest): + """Switching to a non-existent profile returns 404.""" + with patch("sidecar.app.MANIFEST_PATH", str(tmp_manifest)): + client = TestClient(sidecar_app) + response = client.post("/models/switch", json={"profile_id": "nonexistent"}) + assert response.status_code == 404 + data = response.json() + assert data["status"] == "error" + assert "not found" in data["message"] + + def test_switch_returns_error_when_unready(self, tmp_manifest): + """If llama-server doesn't become ready, switch returns error.""" + with patch("sidecar.app.MANIFEST_PATH", str(tmp_manifest)), \ + patch("sidecar.app._start_llama_server", new_callable=AsyncMock), \ + patch("sidecar.app._poll_llama_server_ready", return_value=False): + client = TestClient(sidecar_app) + response = client.post("/models/switch", json={"profile_id": "qwen-3-8b"}) + assert response.status_code == 500 + data = response.json() + assert data["status"] == "error" + + def test_switch_when_already_running_same_profile(self, tmp_manifest): + """Already running this profile — returns ready immediately.""" + with patch("sidecar.app.MANIFEST_PATH", str(tmp_manifest)), \ + patch("sidecar.app._active_profile", "qwen-3-8b"): + client = TestClient(sidecar_app) + response = client.post("/models/switch", json={"profile_id": "qwen-3-8b"}) + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ready" + assert data["active_profile"] == "qwen-3-8b" + + +class TestStatusEndpoint: + """Tests for GET /models/status after switch.""" + + def test_status_reflects_running_server(self, tmp_manifest): + """After a successful switch, status shows active_profile and running server.""" + mock_process = MagicMock() + mock_process.returncode = None + + with patch("sidecar.app.MANIFEST_PATH", str(tmp_manifest)), \ + patch("sidecar.app._llama_server_process", mock_process), \ + patch("sidecar.app._active_profile", "qwen-3-8b"): + client = TestClient(sidecar_app) + response = client.get("/models/status") + assert response.status_code == 200 + data = response.json() + assert data["active_profile"] == "qwen-3-8b" + assert data["llama_server_running"] is True