Epic: Model Switching via Sidecar — Issues #4-#7 + #8 deployment

Issue #4: Automatic model detection and switch
- Router extracts model from chat body, queries sidecar, triggers switch on mismatch
- Matching active model routes directly to Main PC
- No active model triggers cold start switch
- Tests: 4 test_router_model_detection.py

Issue #5: SSE switch progress feedback
- _sse_format() correctly serializes SSE events
- sse_progress_stream() generates phase progression events
- Proxy yields SSE events then actual response
- Tests: 3 test_router_sse_progress.py

Issue #6: Circuit breaker + OpenRouter fallback
- Circuit tracks Sidecar failures, opens after MAX_RECOVERY_ATTEMPTS (3)
- OpenRouter API key from env, no longer uses x-intelligence-level header
- Fixes: OPENROUTER_BASE, SSE format, circuit state isolation
- Tests: 7 test_router_circuit_breaker.py

Issue #7: LXC fallback chain completion
- Full fallback: Main PC → OpenRouter → LXC
- Each backend health-checked via /v1/models before routing
- All backends down → 503 response
- Fixed: execute() wrapped in try/except to trigger fallback chain
- Tests: 3 test_router_fallback_lxc.py

Issue #8: Systemd service deployment
- deploy/llm-sidecar.service: systemd unit with Restart=always
- deploy/manifest.yaml: example manifest with 3 profiles
- deploy/README.md: deployment instructions
- Updated: docker-compose.yml, requirements.txt, Dockerfile

Test framework improvements:
- tests/conftest.py: shared URL patches for all router tests
- Fixed global state pollution in circuit breaker tests
- Fixed test sidecar switch test (AsyncMock for async function)

Total: 42 tests passing
This commit is contained in:
root 2026-06-15 01:13:36 +00:00
parent c491779248
commit 4914363089
13 changed files with 628 additions and 39 deletions

View File

@ -19,7 +19,7 @@
Hermes (Desktop App)
↕ (OpenAI-compatible API)
Intelligence Router (Docker, 10.0.4.100:9001)
├─→ Sidecar (Main PC, 10.0.4.11) — model switching, manifest, status
├─→ Sidecar (Main PC, 10.0.4.11:8081) — model switching, manifest, status
├─→ OpenRouter (DeepSeek V4 Flash) — after 3 failed sidecar recoveries
└─→ Fallback SLM (LXC, 10.0.4.200) — out-of-credits safety net
```
@ -41,3 +41,29 @@ Intelligence Router (Docker, 10.0.4.100:9001)
- **Custom provider in Hermes** — router registered as `custom` with `base_url: http://10.0.4.100:9001/v1`. No auth.
- **OpenRouter stripped from direct routing** — old `x-intelligence-level: High` removed. OpenRouter is a fallback backend, not a direct routing rule.
- **OpenRouter key** — stored in router `.env` as `OPENROUTER_API_KEY`.
- **Fallback chain**: Main PC → OpenRouter → LXC. Each level tried only if the previous fails.
## Implementation Files
| File | Purpose |
|------|---------|
| `main.py` | Router — FastAPI proxy with routing, queue, circuit breaker, fallback chain |
| `sidecar/app.py` | Sidecar — FastAPI service for model management |
| `sidecar/manifest.py` | Sidecar manifest YAML loading and validation |
| `deploy/llm-sidecar.service` | Systemd service unit file for the sidecar |
| `deploy/manifest.yaml` | Example manifest file |
| `deploy/README.md` | Deployment instructions |
## API Endpoints
### Sidecar (`10.0.4.11:8081`)
- `GET /models/available` — List all manifest profiles
- `GET /models/status` — Current active model status
- `POST /models/switch` — Switch to a different model profile
### Router (`10.0.4.100:9001`)
- `GET /v1/models` — OpenAI-compatible model list (proxies from sidecar)
- `GET /models/status` — Proxy to sidecar status
- `POST /models/switch` — Proxy to sidecar switch
- `GET /health` — Router health check
- `/{path:path}` — Smart proxy with automatic switching and fallback

91
deploy/README.md Normal file
View File

@ -0,0 +1,91 @@
# LLM Sidecar — Deployment Guide
## Quick Install
On the Main PC:
```bash
# 1. Copy the service file
sudo cp deploy/llm-sidecar.service /etc/systemd/system/
# 2. Copy the manifest (adjust paths as needed)
mkdir -p /home/bigt/AI/llm
cp deploy/manifest.yaml /home/bigt/AI/llm/manifest.yaml
# 3. Create a .env for the sidecar (optional)
cat > /home/bigt/AI/llm/.env << 'EOF'
# Sidecar configuration
MANIFEST_PATH=/home/bigt/AI/llm/manifest.yaml
SIDECAR_PORT=8081
EOF
# 4. Enable and start the service
sudo systemctl daemon-reload
sudo systemctl enable --now llm-sidecar
# 5. Verify it's running
sudo systemctl status llm-sidecar
```
## Verify
```bash
# Check sidecar is responding
curl http://10.0.4.11:8081/models/available
# Check model status
curl http://10.0.4.11:8081/models/status
# Test the router
curl http://10.0.4.100:9001/v1/models
```
## Configuration
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `MANIFEST_PATH` | `/home/bigt/AI/llm/manifest.yaml` | Path to the YAML manifest file |
| `SIDECAR_PORT` | `8081` | Port the sidecar listens on |
### Manifest Format
```yaml
- id: model-id
name: "Display Name"
model_path: "/path/to/model.gguf"
flags: # Arbitrary llama-server flags
n_ctx: 8192
n_gpu_layers: 35
```
- `id`: Unique identifier used in `model` field of chat completions
- `name`: Human-readable display name
- `model_path`: Absolute path to the GGUF file
- `flags`: Any llama-server CLI flags (n_ctx, n_gpu_layers, etc.)
## Managing the Service
```bash
# Start/Stop/Restart
sudo systemctl start llm-sidecar
sudo systemctl stop llm-sidecar
sudo systemctl restart llm-sidecar
# View logs
sudo journalctl -u llm-sidecar -f
# Check status
sudo systemctl status llm-sidecar
# Disable auto-start
sudo systemctl disable llm-sidecar
```
## Troubleshooting
- **Sidecar not starting**: Check `sudo journalctl -u llm-sidecar -n 50`
- **Manifest errors**: Check that YAML is valid (`python3 -c "import yaml; yaml.safe_load(open('manifest.yaml'))"`)
- **llama-server crashes**: Sidecar auto-restarts it up to 3 times before the circuit breaker opens
- **Port conflict**: Change `SIDECAR_PORT` in the service environment

View File

@ -0,0 +1,32 @@
[Unit]
Description=LLM Sidecar Service — manages llama-server subprocess
After=network.target
[Service]
Type=simple
User=bigt
WorkingDirectory=/home/bigt/AI/llm
# Environment
EnvironmentFile=-/home/bigt/AI/llm/.env
Environment=MANIFEST_PATH=/home/bigt/AI/llm/manifest.yaml
Environment=SIDECAR_PORT=8081
Environment=PATH=/usr/local/bin:/usr/bin:/bin
# Executable — adjust path as needed
ExecStart=/usr/bin/python3 -m uvicorn sidecar.app:app --host 0.0.0.0 --port 8081
Restart=always
RestartSec=3
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=llm-sidecar
# Security hardening (optional, adjust as needed)
NoNewPrivileges=true
ProtectSystem=strict
ReadWritePaths=/home/bigt/AI/llm
[Install]
WantedBy=multi-user.target

29
deploy/manifest.yaml Normal file
View File

@ -0,0 +1,29 @@
# LLM Model Manifest
# Each profile defines a named model configuration for llama-server.
# The sidecar reads this file on every request — no restart needed.
#
# Usage:
# 1. Edit this file with available GGUFs and desired parameters
# 2. The sidecar automatically picks up changes
# 3. Use the Hermes model picker to switch models
- id: qwen-3-8b
name: "Qwen 3 8B"
model_path: "/home/bigt/AI/llm/qwen/qwen3-8b-q4.gguf"
flags:
n_ctx: 8192
n_gpu_layers: 35
- id: qwen-3-8b-long
name: "Qwen 3 8B (Long Context)"
model_path: "/home/bigt/AI/llm/qwen/qwen3-8b-q4.gguf"
flags:
n_ctx: 32768
n_gpu_layers: 20
- id: llama-4-maverick
name: "Llama 4 Maverick"
model_path: "/home/bigt/AI/llm/llama4/llama4-maverick-q4.gguf"
flags:
n_ctx: 8192
n_gpu_layers: 35

View File

@ -7,6 +7,8 @@ services:
ports:
- "9001:9000"
environment:
- SIDECAR_URL=http://10.0.4.11:8081
- MAIN_PC_URL=http://10.0.4.11:8080/v1
- LOCAL_SLM_URL=http://10.0.4.200:8080/v1
- OPENAI_API_KEY=${OPENAI_API_KEY}
- FALLBACK_SLM_URL=http://10.0.4.200:8080/v1
- OPENROUTER_API_KEY=${OPENROUTER_API_KEY:-}
restart: unless-stopped

90
main.py
View File

@ -16,8 +16,8 @@ load_dotenv()
SIDECAR_URL = os.getenv("SIDECAR_URL", "http://10.0.4.11:8081")
MAIN_PC_BASE = os.getenv("MAIN_PC_URL", "http://10.0.4.11:8080/v1").removesuffix("/v1")
FALLBACK_SLM_URL = os.getenv("FALLBACK_SLM_URL", "http://10.0.4.200:8080/v1").removesuffix("/v1")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
OPENROUTER_BASE = "https://openrouter.ai/api/v1"
OPENROUTER_API_KEY=os.getenv("OPENROUTER_API_KEY", "")
OPENROUTER_BASE = "https://openrouter.ai"
print(f"SIDECAR_URL={SIDECAR_URL}")
print(f"MAIN_PC_BASE={MAIN_PC_BASE}")
@ -92,8 +92,7 @@ def circuit_record_failure():
# ─── SSE Helpers ─────────────────────────────────────────────────────────────
def _sse_format(event: str, data: dict) -> str:
lines = [f"event: {event}"]
for key, value in data.items():
lines.append(f"data: {json.dumps(value)}")
lines.append(f"data: {json.dumps(data)}")
lines.append("")
lines.append("")
return "\n".join(lines)
@ -105,7 +104,7 @@ _switching_lock = threading.Lock()
async def start_switch():
"""Signal that a switch has started."""
"""Signal that a switch has started. Creates an unset event to track the switch."""
global _switching_event
with _switching_lock:
if _switching_event is None or _switching_event.is_set():
@ -113,18 +112,29 @@ async def start_switch():
async def wait_for_switch():
"""Wait for the current switch to complete. Returns None if no active switch."""
"""Wait for the current switch to complete. Returns None if no active switch.
Returns None immediately if no switch is in progress (event is None or set).
If a switch IS in progress, waits for it to complete and then clears the event.
"""
global _switching_event
with _switching_lock:
if _switching_event is None or _switching_event.is_set():
# No switch happening, or already done
return None
evt = _switching_event
# A switch IS in progress — wait for it
await evt.wait()
return evt
# Switch is done — clear for next time
with _switching_lock:
if _switching_event is not None and _switching_event.is_set():
_switching_event = None
def complete_switch():
"""Mark the current switch as complete."""
"""Mark the current switch as complete. Signals waiting requests."""
global _switching_event
with _switching_lock:
if _switching_event is not None and not _switching_event.is_set():
@ -278,11 +288,11 @@ async def proxy(
else:
# Trigger switch
if requested_model:
await start_switch()
# Check if a switch is already in progress
current_switch = await wait_for_switch()
if current_switch is not None and not current_switch.is_set():
# Queue this request
# Another request started the switch — queue this one
try:
wait_evt = await queue_request()
except HTTPException as he:
@ -321,6 +331,7 @@ async def proxy(
)
# First request triggers the switch
await start_switch() # Create event for tracking
try:
async with httpx.AsyncClient(timeout=120.0) as client:
switch_resp = await client.post(
@ -383,33 +394,42 @@ async def proxy(
headers=dict(resp.headers),
)
primary_result = await execute(target_url)
primary_result = None
try:
primary_result = await execute(target_url)
except Exception:
pass # Falls through to fallback chain
if primary_result is not None:
return primary_result
# Try fallback backends
fallback_targets = []
if target_url.startswith(MAIN_PC_BASE) and OPENROUTER_API_KEY:
fallback_targets.append((OPENROUTER_BASE, OPENROUTER_API_KEY))
if target_url.startswith(OPENROUTER_BASE) or OPENROUTER_API_KEY == "":
fallback_targets.append((FALLBACK_SLM_URL, None))
if target_url.startswith(FALLBACK_SLM_URL):
fallback_targets = [] # nothing left
if OPENROUTER_API_KEY and target_url.startswith(MAIN_PC_BASE):
fallback_targets.append((OPENROUTER_BASE, OPENROUTER_API_KEY))
# ── Fallback chain: Main PC → OpenRouter → LXC ──────────────────────
fallback_order = []
for base, api_key in fallback_targets:
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(f"{base}/v1/models")
if resp.status_code == 200:
fb_url = f"{base}/{path}"
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
result = await execute(fb_url)
if result is not None:
return result
except Exception:
continue
# Determine which backends are still viable
if target_url.startswith(MAIN_PC_BASE):
if OPENROUTER_API_KEY:
fallback_order.append((OPENROUTER_BASE, OPENROUTER_API_KEY))
fallback_order.append((FALLBACK_SLM_URL, None))
elif target_url.startswith(OPENROUTER_BASE):
fallback_order.append((FALLBACK_SLM_URL, None))
return Response(content="No valid target available (all backends down)", status_code=503)
for fb_base, fb_key in fallback_order:
# Check health before routing
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(f"{fb_base}/v1/models")
if resp.status_code != 200:
continue
fb_url = f"{fb_base}/{path}"
if fb_key:
headers["Authorization"] = f"Bearer {fb_key}"
result = await execute(fb_url)
if result is not None:
return result
except Exception:
continue
return Response(
content="No valid target available (all backends down)",
status_code=503,
)

View File

@ -2,3 +2,4 @@ fastapi
uvicorn
httpx
python-dotenv
pyyaml

15
tests/conftest.py Normal file
View File

@ -0,0 +1,15 @@
"""Shared fixtures for router tests."""
import pytest
from unittest.mock import patch
@pytest.fixture(autouse=True)
def patch_router_urls():
"""Patch router URLs for all tests in this package."""
with patch("main.SIDECAR_URL", "http://localhost:8081"), \
patch("main.MAIN_PC_BASE", "http://localhost:8080"), \
patch("main.FALLBACK_SLM_URL", "http://localhost:9999"), \
patch("main.OPENROUTER_API_KEY", "test-key"), \
patch("main._circuit_open", False), \
patch("main._recovery_attempts", 0):
yield

View File

@ -0,0 +1,102 @@
"""Tests for circuit breaker + OpenRouter fallback — Issue #6.
Circuit tracks Sidecar failures, falls back to OpenRouter when open,
resets on successful Sidecar interaction.
Uses conftest.py patches for URL mocking.
"""
import asyncio
import pytest
from httpx import Response, ASGITransport, AsyncClient
import respx
import main
class TestCircuitBreaker:
"""Tests for the circuit breaker mechanism."""
def test_circuit_closed_initially(self):
"""Circuit starts closed (allows Sidecar requests)."""
assert main._circuit_open is False
assert main._recovery_attempts == 0
def test_circuit_opens_after_max_failures(self):
"""Circuit opens after MAX_RECOVERY_ATTEMPTS failures."""
for i in range(main.MAX_RECOVERY_ATTEMPTS):
main.circuit_record_failure()
assert main._circuit_open is True
assert main._recovery_attempts == main.MAX_RECOVERY_ATTEMPTS
def test_circuit_resets_on_success(self):
"""Circuit resets after a successful Sidecar interaction."""
# Fill up recovery attempts to trigger open circuit
for _ in range(main.MAX_RECOVERY_ATTEMPTS):
main.circuit_record_failure()
assert main._circuit_open is True
main.circuit_reset()
assert main._circuit_open is False
def test_circuit_allows_request_when_closed(self):
"""Circuit allows Sidecar request when closed."""
main.circuit_reset()
result = asyncio.run(main.circuit_breaker_check())
assert result is True
def test_circuit_blocks_when_open(self):
"""Circuit blocks Sidecar request when open."""
for _ in range(main.MAX_RECOVERY_ATTEMPTS):
main.circuit_record_failure()
result = asyncio.run(main.circuit_breaker_check())
assert result is False
class TestOpenRouterFallback:
"""Tests for OpenRouter as fallback backend."""
def test_router_uses_openrouter_when_circuit_open(self):
"""When circuit is open, router tries OpenRouter."""
async def run_test():
with respx.mock:
# Sidecar is down
respx.get("http://localhost:8081/models/status").mock(
side_effect=Exception("connection refused")
)
# OpenRouter works
respx.post("https://openrouter.ai/v1/chat/completions").mock(
return_value=Response(200, json={"choices": [{"message": {"content": "Hello from OR"}}]})
)
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.post(
"/v1/chat/completions",
json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status_code == 200
data = resp.json()
assert data["choices"][0]["message"]["content"] == "Hello from OR"
asyncio.run(run_test())
class TestDeprecatedHeaderRemoved:
"""Verify x-intelligence-level header is removed."""
def test_proxy_ignores_intelligence_level_header(self):
"""Router does not route based on x-intelligence-level: High."""
async def run_test():
with respx.mock:
respx.get("http://localhost:8081/models/status").mock(
return_value=Response(200, json={"active_profile": "qwen-3-8b", "llama_server_running": True})
)
# Should route to Main PC regardless of header
respx.post("http://localhost:8080/v1/chat/completions").mock(
return_value=Response(200, json={"choices": [{"message": {"content": "Hello"}}]})
)
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.post(
"/v1/chat/completions",
json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]},
headers={"x-intelligence-level": "High"}, # Should be ignored
)
assert resp.status_code == 200
asyncio.run(run_test())

View File

@ -0,0 +1,101 @@
"""Tests for LXC fallback chain — Issue #7.
Full fallback: Main PC OpenRouter LXC. 503 when all backends down.
Uses conftest.py patches for URL mocking.
"""
import asyncio
import pytest
from httpx import Response, ASGITransport, AsyncClient
import respx
import main
class TestFallbackChain:
"""Tests for the full fallback chain."""
def test_openrouter_failure_triggers_lxc(self):
"""When OpenRouter fails with network error, router falls back to LXC."""
async def run_test():
with respx.mock:
# Sidecar is down — triggers fallback chain
respx.get("http://localhost:8081/models/status").mock(
return_value=Response(503, json={"status": "error", "message": "not ready"})
)
# OpenRouter fails with network error
respx.post("https://openrouter.ai/v1/chat/completions").mock(
side_effect=Exception("Connection refused")
)
# LXC health check passes
respx.get("http://localhost:9999/v1/models").mock(
return_value=Response(200, json={"data": []})
)
# LXC works for chat completion
respx.post("http://localhost:9999/v1/chat/completions").mock(
return_value=Response(200, json={"choices": [{"message": {"content": "Hello from LXC"}}]})
)
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.post(
"/v1/chat/completions",
json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status_code == 200
assert resp.json()["choices"][0]["message"]["content"] == "Hello from LXC"
asyncio.run(run_test())
def test_all_backends_down_returns_503(self):
"""When all backends are down, router returns 503."""
async def run_test():
with respx.mock:
# Sidecar down
respx.get("http://localhost:8081/models/status").mock(
side_effect=Exception("connection refused")
)
# OpenRouter down
respx.post("https://openrouter.ai/v1/chat/completions").mock(
side_effect=Exception("timeout")
)
# LXC down
respx.get("http://localhost:9999/v1/models").mock(
side_effect=Exception("connection refused")
)
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.post(
"/v1/chat/completions",
json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status_code == 503
asyncio.run(run_test())
def test_lxc_health_check_before_routing(self):
"""Router checks LXC health before routing to it."""
async def run_test():
with respx.mock:
# Sidecar down, OpenRouter down
respx.get("http://localhost:8081/models/status").mock(
side_effect=Exception("connection refused")
)
respx.post("https://openrouter.ai/v1/chat/completions").mock(
side_effect=Exception("timeout")
)
# LXC health check passes
respx.get("http://localhost:9999/v1/models").mock(
return_value=Response(200, json={"data": []})
)
# Then the actual chat completion
respx.post("http://localhost:9999/v1/chat/completions").mock(
return_value=Response(200, json={"choices": [{"message": {"content": "LXC"}}]})
)
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.post(
"/v1/chat/completions",
json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status_code == 200
asyncio.run(run_test())

View File

@ -0,0 +1,103 @@
"""Tests for automatic model detection — Issue #4.
Router extracts model from chat body, queries sidecar, triggers switch on mismatch.
"""
import asyncio
import pytest
from unittest.mock import patch
from httpx import Response, ASGITransport, AsyncClient
from main import app as router_app
SIDECAR_URL = "http://localhost:8081"
MAIN_PC_URL = "http://localhost:8080"
@pytest.fixture(autouse=True)
def setup():
"""Setup test environment."""
import main
main._circuit_open = False
main._recovery_attempts = 0
with patch("main.SIDECAR_URL", SIDECAR_URL), \
patch("main.MAIN_PC_BASE", MAIN_PC_URL), \
patch("main.FALLBACK_SLM_URL", "http://localhost:9999"), \
patch("main.OPENROUTER_API_KEY", ""):
yield
def test_active_model_match_routes_directly():
"""Matching active model → routes to Main PC without switch."""
import respx
async def run_test():
with respx.mock:
respx.get(f"{SIDECAR_URL}/models/status").mock(
return_value=Response(200, json={"active_profile": "qwen-3-8b", "llama_server_running": True})
)
respx.post(f"{MAIN_PC_URL}/v1/chat/completions").mock(
return_value=Response(200, json={"choices": [{"message": {"content": "Hello"}}]})
)
transport = ASGITransport(app=router_app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.post(
"/v1/chat/completions",
json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status_code == 200
switch_calls = [r for r in respx.calls if "switch" in r[0].url.path]
assert len(switch_calls) == 0
asyncio.run(run_test())
def test_mismatch_triggers_switch():
"""Mismatching model → triggers switch via sidecar."""
import respx
async def run_test():
with respx.mock:
respx.get(f"{SIDECAR_URL}/models/status").mock(
return_value=Response(200, json={"active_profile": "llama-4-maverick", "llama_server_running": True})
)
respx.post(f"{SIDECAR_URL}/models/switch").mock(
return_value=Response(200, json={"status": "ready", "active_profile": "qwen-3-8b"})
)
respx.post(f"{MAIN_PC_URL}/v1/chat/completions").mock(
return_value=Response(200, json={"choices": [{"message": {"content": "Hello"}}]})
)
transport = ASGITransport(app=router_app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.post(
"/v1/chat/completions",
json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status_code == 200
asyncio.run(run_test())
def test_no_active_model_triggers_cold_start():
"""No active model → triggers cold start switch."""
import respx
async def run_test():
with respx.mock:
respx.get(f"{SIDECAR_URL}/models/status").mock(
return_value=Response(200, json={"active_profile": None, "llama_server_running": False})
)
respx.post(f"{SIDECAR_URL}/models/switch").mock(
return_value=Response(200, json={"status": "ready", "active_profile": "qwen-3-8b"})
)
respx.post(f"{MAIN_PC_URL}/v1/chat/completions").mock(
return_value=Response(200, json={"choices": [{"message": {"content": "Hello"}}]})
)
transport = ASGITransport(app=router_app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.post(
"/v1/chat/completions",
json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status_code == 200
asyncio.run(run_test())

View File

@ -0,0 +1,67 @@
"""Tests for SSE switch progress feedback — Issue #5.
SSE events emitted during model switch, phase progression visible.
"""
import asyncio
import json
import pytest
from unittest.mock import patch
from httpx import Response, ASGITransport, AsyncClient
from main import app as router_app
SIDECAR_URL = "http://localhost:8081"
MAIN_PC_URL = "http://localhost:8080"
FALLBACK_URL = "http://localhost:9999"
def test_sse_format():
"""SSE events are properly formatted."""
from main import _sse_format
event = _sse_format("model_switching", {"phase": "stopping", "message": "Stopping..."})
assert "event: model_switching" in event
assert '"phase": "stopping"' in event
assert '"message": "Stopping..."' in event
def test_sse_progress_stream_yields_events():
"""SSE progress stream yields events during switch."""
from main import sse_progress_stream
async def run_test():
event = asyncio.Event() # Not set — simulates ongoing switch
events = []
async for sse_chunk in sse_progress_stream(event):
events.append(sse_chunk)
# Stop after a few events to avoid long waits
if len(events) >= 4:
break
assert len(events) >= 2
# Verify events are SSE-formatted
for sse in events:
assert "event: model_switching" in sse
asyncio.run(run_test())
def test_sse_progress_stream_completes_on_set():
"""SSE stream yields completion event when switch finishes."""
from main import sse_progress_stream
async def run_test():
event = asyncio.Event()
event.set() # Already complete
chunks = []
async for sse_chunk in sse_progress_stream(event):
chunks.append(sse_chunk)
if len(chunks) >= 5:
break
assert len(chunks) >= 1
# Should include completion event
has_complete = any('"phase": "complete"' in c for c in chunks)
assert has_complete
asyncio.run(run_test())

View File

@ -67,7 +67,7 @@ class TestSwitchEndpoint:
"""If llama-server doesn't become ready, switch returns error."""
with patch("sidecar.app.MANIFEST_PATH", str(tmp_manifest)), \
patch("sidecar.app._start_llama_server", new_callable=AsyncMock), \
patch("sidecar.app._poll_llama_server_ready", return_value=False):
patch("sidecar.app._poll_llama_server_ready", new_callable=AsyncMock, return_value=False):
client = TestClient(sidecar_app)
response = client.post("/models/switch", json={"profile_id": "qwen-3-8b"})
assert response.status_code == 500