2026-06-15 04:13:36 +03:00
|
|
|
"""Tests for SSE switch progress feedback — Issue #5.
|
|
|
|
|
|
|
|
|
|
SSE events emitted during model switch, phase progression visible.
|
|
|
|
|
"""
|
|
|
|
|
import asyncio
|
|
|
|
|
import json
|
|
|
|
|
import pytest
|
|
|
|
|
from unittest.mock import patch
|
|
|
|
|
from httpx import Response, ASGITransport, AsyncClient
|
|
|
|
|
|
|
|
|
|
from main import app as router_app
|
|
|
|
|
|
2026-06-15 16:16:47 +03:00
|
|
|
SIDECAR_URL = "http://localhost:8080"
|
2026-06-15 04:13:36 +03:00
|
|
|
MAIN_PC_URL = "http://localhost:8080"
|
|
|
|
|
FALLBACK_URL = "http://localhost:9999"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_sse_format():
|
|
|
|
|
"""SSE events are properly formatted."""
|
|
|
|
|
from main import _sse_format
|
|
|
|
|
|
|
|
|
|
event = _sse_format("model_switching", {"phase": "stopping", "message": "Stopping..."})
|
|
|
|
|
assert "event: model_switching" in event
|
|
|
|
|
assert '"phase": "stopping"' in event
|
|
|
|
|
assert '"message": "Stopping..."' in event
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_sse_progress_stream_yields_events():
|
|
|
|
|
"""SSE progress stream yields events during switch."""
|
|
|
|
|
from main import sse_progress_stream
|
|
|
|
|
|
|
|
|
|
async def run_test():
|
|
|
|
|
event = asyncio.Event() # Not set — simulates ongoing switch
|
|
|
|
|
events = []
|
|
|
|
|
async for sse_chunk in sse_progress_stream(event):
|
|
|
|
|
events.append(sse_chunk)
|
|
|
|
|
# Stop after a few events to avoid long waits
|
|
|
|
|
if len(events) >= 4:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
assert len(events) >= 2
|
|
|
|
|
# Verify events are SSE-formatted
|
|
|
|
|
for sse in events:
|
|
|
|
|
assert "event: model_switching" in sse
|
|
|
|
|
|
|
|
|
|
asyncio.run(run_test())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_sse_progress_stream_completes_on_set():
|
|
|
|
|
"""SSE stream yields completion event when switch finishes."""
|
|
|
|
|
from main import sse_progress_stream
|
|
|
|
|
|
|
|
|
|
async def run_test():
|
|
|
|
|
event = asyncio.Event()
|
|
|
|
|
event.set() # Already complete
|
|
|
|
|
chunks = []
|
|
|
|
|
async for sse_chunk in sse_progress_stream(event):
|
|
|
|
|
chunks.append(sse_chunk)
|
|
|
|
|
if len(chunks) >= 5:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
assert len(chunks) >= 1
|
|
|
|
|
# Should include completion event
|
|
|
|
|
has_complete = any('"phase": "complete"' in c for c in chunks)
|
|
|
|
|
assert has_complete
|
|
|
|
|
|
|
|
|
|
asyncio.run(run_test())
|