intelligence-router/sidecar/app.py

261 lines
8.5 KiB
Python
Raw Permalink Normal View History

"""Sidecar FastAPI service — Issue #2 foundation.
Runs on the Main PC, manages llama-server subprocess, serves manifest/profile data.
"""
import os
import asyncio
import signal as signal_module
from contextlib import asynccontextmanager
from typing import Optional
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from sidecar.manifest import load_manifest
# Configuration from environment
MANIFEST_PATH = os.getenv("MANIFEST_PATH", "/home/bigt/AI/llm/manifest.yaml")
SIDECAR_PORT = int(os.getenv("SIDECAR_PORT", "8080"))
LLAMA_SERVER_PORT = 8081
LLAMA_STDERR_LOG = os.path.join(
os.path.dirname(MANIFEST_PATH), "llama-server-stderr.log"
)
# Global state
_llama_server_process: Optional[asyncio.subprocess.Process] = None
_active_profile: Optional[str] = None
_switch_lock = asyncio.Lock() # Use asyncio.Lock to avoid blocking the event loop
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage sidecar lifecycle — no default model loaded."""
print(f"Sidecar starting, manifest={MANIFEST_PATH}, port={SIDECAR_PORT}", flush=True)
yield
# Cleanup: kill llama-server if running
global _llama_server_process
if _llama_server_process:
await _kill_llama_server()
app = FastAPI(lifespan=lifespan)
def _close_stderr_log():
"""Close the stderr log file handle if it's still attached to the process."""
global _llama_server_process
if _llama_server_process is not None:
fh = getattr(_llama_server_process, "_stderr_fh", None)
if fh is not None and not fh.closed:
try:
fh.close()
except Exception:
pass
async def _kill_llama_server():
"""Kill the llama-server subprocess and wait for it to fully terminate.
This MUST be async because process.wait() is a coroutine. The synchronous
version was calling .wait() without await, creating an unawaited coroutine
object the old process was never actually waited on, so it could still
hold GPU VRAM when the new server started.
"""
global _llama_server_process
if _llama_server_process is None or _llama_server_process.returncode is not None:
_close_stderr_log()
return
try:
_llama_server_process.send_signal(signal_module.SIGTERM)
try:
await asyncio.wait_for(_llama_server_process.wait(), timeout=10)
except asyncio.TimeoutError:
_llama_server_process.kill()
try:
await asyncio.wait_for(_llama_server_process.wait(), timeout=5)
except asyncio.TimeoutError:
pass
except Exception:
pass
finally:
_llama_server_process = None
_close_stderr_log()
def _flag_value(value) -> str:
"""Convert a manifest flag value to a llama-server CLI argument string.
YAML booleans (True/False/on/off/yes/no) are parsed as Python bools by
safe_load. llama-server expects 'on'/'off' for boolean flags, not 'True'/'False'.
"""
if isinstance(value, bool):
return "on" if value else "off"
return str(value)
def _flag_key(key: str) -> str:
"""Convert a manifest flag key to the correct llama-server CLI flag name.
llama-server uses hyphenated flag names (--ctx-size, --n-gpu-layers),
but YAML keys often use underscores. Some flags were also renamed
across llama.cpp versions (e.g. --n-ctx --ctx-size).
This function normalises underscores to hyphens and applies known renames.
"""
normalized = key.replace("_", "-")
FLAG_RENAMES = {
"n-ctx": "ctx-size",
}
return FLAG_RENAMES.get(normalized, normalized)
async def _start_llama_server(profile: dict):
"""Start llama-server with the given profile's configuration."""
global _llama_server_process
# Kill any existing process
await _kill_llama_server()
# Build command from profile flags
2026-06-15 19:01:53 +03:00
cmd = ["/home/bigt/AI/llama.cpp/build/bin/llama-server"]
cmd += ["--model", profile["model_path"]]
cmd += ["--port", str(LLAMA_SERVER_PORT)]
cmd += ["--host", "0.0.0.0"]
for key, value in profile.get("flags", {}).items():
cmd += ["--" + _flag_key(key), _flag_value(value)]
print(f"Starting llama-server: {' '.join(cmd)}", flush=True)
# Capture stderr so we can diagnose crashes (model not found, OOM, bad flag)
stderr_fh = open(LLAMA_STDERR_LOG, "w")
_llama_server_process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=stderr_fh,
)
# Keep a reference so we can close the handle later
_llama_server_process._stderr_fh = stderr_fh # type: ignore[attr-defined]
return _llama_server_process
async def _poll_llama_server_ready(max_retries: int = 60, interval: float = 0.5):
"""Poll llama-server readiness via /v1/models endpoint.
Returns True on success. On failure, dumps the captured stderr (if any)
so the user can see why llama-server crashed.
"""
import httpx
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=2.0) as client:
resp = await client.get(f"http://localhost:{LLAMA_SERVER_PORT}/v1/models")
if resp.status_code == 200:
return True
except Exception:
pass
await asyncio.sleep(interval)
# Flush and close the stderr handle so all data is on disk before we read
_close_stderr_log()
# ── Dump stderr for diagnosis ──────────────────────────────────────
print("llama-server did NOT become ready — dumping stderr:", flush=True)
try:
with open(LLAMA_STDERR_LOG) as f:
for line in f:
print(f" {line.rstrip()}", flush=True)
except FileNotFoundError:
print(" (stderr log not found — process may not have started)", flush=True)
# Also log exit code if the process died
global _llama_server_process
if _llama_server_process and _llama_server_process.returncode is not None:
print(
f"llama-server exited with code {_llama_server_process.returncode}",
flush=True,
)
return False
@app.get("/models/available")
async def get_available_models():
"""Read manifest YAML and return list of profiles."""
profiles = load_manifest(MANIFEST_PATH)
if profiles is None:
raise HTTPException(status_code=500, detail="Failed to parse manifest YAML")
return profiles
@app.get("/models/status")
async def get_models_status():
"""Return current model status."""
global _active_profile
return {
"active_profile": _active_profile,
"llama_server_running": (
_llama_server_process is not None and _llama_server_process.returncode is None
),
}
class SwitchRequest(BaseModel):
profile_id: str
@app.post("/models/switch")
async def switch_model(payload: SwitchRequest):
"""Stop current llama-server, start new one with the given profile, wait for readiness."""
global _active_profile
async with _switch_lock:
# Validate profile_id
profiles = load_manifest(MANIFEST_PATH)
if profiles is None:
return JSONResponse(
status_code=500,
content={"status": "error", "message": "Failed to load manifest"},
)
profile = None
for p in profiles:
if p["id"] == payload.profile_id:
profile = p
break
if profile is None:
return JSONResponse(
status_code=404,
content={"status": "error", "message": f"Profile '{payload.profile_id}' not found"},
)
# Already running this profile — just check readiness
if _active_profile == payload.profile_id:
return {
"status": "ready",
"active_profile": _active_profile,
}
# Start the new model
await _kill_llama_server()
_active_profile = None
await _start_llama_server(profile)
# Poll for readiness
ready = await _poll_llama_server_ready()
if ready:
_active_profile = payload.profile_id
return {
"status": "ready",
"active_profile": _active_profile,
}
else:
_active_profile = None
return JSONResponse(
status_code=500,
content={"status": "error", "message": "llama-server failed to become ready"},
)