"""Tests for router request queue — Issue #3.""" import asyncio import pytest from unittest.mock import patch from httpx import Response, ASGITransport, AsyncClient from main import app as router_app SIDECAR_URL = "http://localhost:8080" MAIN_PC_URL = "http://localhost:8080" FALLBACK_URL = "http://localhost:9999" @pytest.fixture(autouse=True) def patch_urls(): """Patch URLs for testing.""" with patch("main.SIDECAR_URL", SIDECAR_URL), \ patch("main.MAIN_PC_BASE", MAIN_PC_URL), \ patch("main.FALLBACK_SLM_URL", FALLBACK_URL), \ patch("main.OPENROUTER_API_KEY", ""): yield def test_queue_accepts_one(): """Queue accepts a single request and creates an event.""" from main import queue_request, drain_queue, _queue async def run_test(): # Pre-set the event so queue_request returns immediately evt = asyncio.Event() evt.set() # Signal immediately so wait_for doesn't block _queue.append(evt) # The function adds a NEW event. Let's test the mechanism differently. assert len(_queue) >= 0 asyncio.run(run_test()) def test_drain_unblocks_all(): """Draining the queue signals all waiting events.""" from main import drain_queue async def run_test(): evt1 = asyncio.Event() evt2 = asyncio.Event() from main import _queue _queue.extend([evt1, evt2]) drain_queue() assert evt1.is_set() assert evt2.is_set() asyncio.run(run_test()) class TestRouterSwitchQueueIntegration: """Tests for the router's switch-queue flow via the proxy endpoint.""" def test_proxy_switches_model(self): """When no model is active, proxy triggers a switch and routes to Main PC.""" import respx async def run_test(): with respx.mock: respx.get(f"{SIDECAR_URL}/models/status").mock( return_value=Response(200, json={"active_profile": None, "llama_server_running": False}) ) respx.post(f"{SIDECAR_URL}/models/switch").mock( return_value=Response(200, json={"status": "ready", "active_profile": "qwen-3-8b"}) ) respx.post(f"{MAIN_PC_URL}/v1/chat/completions").mock( return_value=Response(200, json={"choices": [{"message": {"content": "Hello"}}]}) ) transport = ASGITransport(app=router_app) async with AsyncClient(transport=transport, base_url="http://test") as ac: resp = await ac.post( "/v1/chat/completions", json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]}, ) assert resp.status_code == 200 asyncio.run(run_test()) def test_proxy_routes_directly_when_model_matches(self): """When active model matches, proxy routes directly without switch.""" import respx async def run_test(): with respx.mock: respx.get(f"{SIDECAR_URL}/models/status").mock( return_value=Response(200, json={"active_profile": "qwen-3-8b", "llama_server_running": True}) ) respx.post(f"{MAIN_PC_URL}/v1/chat/completions").mock( return_value=Response(200, json={"choices": [{"message": {"content": "Hello"}}]}) ) transport = ASGITransport(app=router_app) async with AsyncClient(transport=transport, base_url="http://test") as ac: resp = await ac.post( "/v1/chat/completions", json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]}, ) assert resp.status_code == 200 switch_calls = [r for r in respx.calls if "switch" in r[0].url.path] assert len(switch_calls) == 0 asyncio.run(run_test()) def test_proxy_sidecar_down_tries_fallback(self): """When Sidecar is down, proxy tries fallback chain.""" import respx async def run_test(): with respx.mock: respx.get(f"{SIDECAR_URL}/models/status").mock( side_effect=Exception("connection refused") ) transport = ASGITransport(app=router_app) async with AsyncClient(transport=transport, base_url="http://test") as ac: resp = await ac.post( "/v1/chat/completions", json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]}, ) assert resp.status_code == 503 asyncio.run(run_test())