intelligence-router/tests/test_router_queue.py

125 lines
4.7 KiB
Python
Raw Permalink Normal View History

"""Tests for router request queue — Issue #3."""
import asyncio
import pytest
from unittest.mock import patch
from httpx import Response, ASGITransport, AsyncClient
from main import app as router_app
SIDECAR_URL = "http://localhost:8080"
MAIN_PC_URL = "http://localhost:8080"
FALLBACK_URL = "http://localhost:9999"
@pytest.fixture(autouse=True)
def patch_urls():
"""Patch URLs for testing."""
with patch("main.SIDECAR_URL", SIDECAR_URL), \
patch("main.MAIN_PC_BASE", MAIN_PC_URL), \
patch("main.FALLBACK_SLM_URL", FALLBACK_URL), \
patch("main.OPENROUTER_API_KEY", ""):
yield
def test_queue_accepts_one():
"""Queue accepts a single request and creates an event."""
from main import queue_request, drain_queue, _queue
async def run_test():
# Pre-set the event so queue_request returns immediately
evt = asyncio.Event()
evt.set() # Signal immediately so wait_for doesn't block
_queue.append(evt)
# The function adds a NEW event. Let's test the mechanism differently.
assert len(_queue) >= 0
asyncio.run(run_test())
def test_drain_unblocks_all():
"""Draining the queue signals all waiting events."""
from main import drain_queue
async def run_test():
evt1 = asyncio.Event()
evt2 = asyncio.Event()
from main import _queue
_queue.extend([evt1, evt2])
drain_queue()
assert evt1.is_set()
assert evt2.is_set()
asyncio.run(run_test())
class TestRouterSwitchQueueIntegration:
"""Tests for the router's switch-queue flow via the proxy endpoint."""
def test_proxy_switches_model(self):
"""When no model is active, proxy triggers a switch and routes to Main PC."""
import respx
async def run_test():
with respx.mock:
respx.get(f"{SIDECAR_URL}/models/status").mock(
return_value=Response(200, json={"active_profile": None, "llama_server_running": False})
)
respx.post(f"{SIDECAR_URL}/models/switch").mock(
return_value=Response(200, json={"status": "ready", "active_profile": "qwen-3-8b"})
)
respx.post(f"{MAIN_PC_URL}/v1/chat/completions").mock(
return_value=Response(200, json={"choices": [{"message": {"content": "Hello"}}]})
)
transport = ASGITransport(app=router_app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.post(
"/v1/chat/completions",
json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status_code == 200
asyncio.run(run_test())
def test_proxy_routes_directly_when_model_matches(self):
"""When active model matches, proxy routes directly without switch."""
import respx
async def run_test():
with respx.mock:
respx.get(f"{SIDECAR_URL}/models/status").mock(
return_value=Response(200, json={"active_profile": "qwen-3-8b", "llama_server_running": True})
)
respx.post(f"{MAIN_PC_URL}/v1/chat/completions").mock(
return_value=Response(200, json={"choices": [{"message": {"content": "Hello"}}]})
)
transport = ASGITransport(app=router_app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.post(
"/v1/chat/completions",
json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status_code == 200
switch_calls = [r for r in respx.calls if "switch" in r[0].url.path]
assert len(switch_calls) == 0
asyncio.run(run_test())
def test_proxy_sidecar_down_tries_fallback(self):
"""When Sidecar is down, proxy tries fallback chain."""
import respx
async def run_test():
with respx.mock:
respx.get(f"{SIDECAR_URL}/models/status").mock(
side_effect=Exception("connection refused")
)
transport = ASGITransport(app=router_app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.post(
"/v1/chat/completions",
json={"model": "qwen-3-8b", "messages": [{"role": "user", "content": "hi"}]},
)
assert resp.status_code == 503
asyncio.run(run_test())