#!/usr/bin/env python3 """WebSocket chat test — group chat + DM flows.""" import asyncio import json import sys import time try: import websockets except ImportError: print("installing websockets...") import subprocess subprocess.check_call([sys.executable, "-m", "pip", "install", "websockets", "-q"]) import websockets WS_URL = "ws://localhost:28889/ws/chat" RESULTS = [] def log(tag, msg): ts = time.strftime("%H:%M:%S") line = f"[{ts}] [{tag}] {msg}" print(line) RESULTS.append(line) async def recv_json(ws, timeout=5): raw = await asyncio.wait_for(ws.recv(), timeout=timeout) return json.loads(raw) async def send_json(ws, data): await ws.send(json.dumps(data)) async def test_ws(): log("TEST", "=== WebSocket Chat Test Start ===") log("WS", "connecting user1...") user1 = await websockets.connect(WS_URL, additional_headers={"X-User-ID": "1001"}) resp = await recv_json(user1) log("WS", f"user1 connected: {resp}") assert resp["type"] == "connected", f"expected connected, got {resp['type']}" log("WS", "connecting user2...") user2 = await websockets.connect(WS_URL, additional_headers={"X-User-ID": "1002"}) resp = await recv_json(user2) log("WS", f"user2 connected: {resp}") assert resp["type"] == "connected" log("TEST", "--- Test 1: Create Group ---") await send_json(user1, {"type": "create_group", "name": "test-room"}) resp = await recv_json(user1) log("WS", f"create_group response: {resp}") assert resp["type"] == "group_created", f"expected group_created, got {resp['type']}" group_id = resp["sessionId"] log("TEST", f"group created with id={group_id}") log("TEST", "--- Test 2: Create DM ---") await send_json(user1, {"type": "create_dm", "targetId": 1002}) resp = await recv_json(user1) log("WS", f"create_dm response: {resp}") assert resp["type"] == "dm_created", f"expected dm_created, got {resp['type']}" dm_id = resp["sessionId"] log("TEST", f"DM created with id={dm_id}") log("TEST", "--- Test 3: Join Group ---") await send_json(user1, {"type": "join", "sessionId": group_id}) msgs = [] for _ in range(2): try: r = await recv_json(user1, timeout=3) msgs.append(r) log("WS", f"join msg: {r}") except asyncio.TimeoutError: break types = {m["type"] for m in msgs} assert "joined" in types, f"expected 'joined' in {types}" log("TEST", f"join received types: {types}") log("TEST", "--- Test 4: Send Message in Group ---") await send_json(user1, {"type": "message", "sessionId": group_id, "content": "hello group!"}) resp = await recv_json(user1) log("WS", f"message broadcast: {resp}") assert resp["type"] == "message", f"expected message, got {resp['type']}" assert resp["content"] == "hello group!" log("TEST", "--- Test 5: Send DM ---") await send_json(user1, {"type": "message", "sessionId": dm_id, "content": "hello DM!"}) resp = await recv_json(user1) log("WS", f"DM message: {resp}") assert resp["type"] == "message" assert resp["content"] == "hello DM!" log("TEST", "--- Test 6: Message History ---") await send_json(user1, {"type": "history", "sessionId": group_id}) resp = await recv_json(user1) log("WS", f"history response: type={resp['type']} data_len={len(resp.get('data', []))}") assert resp["type"] == "history" log("TEST", "--- Test 7: Invalid Message ---") await send_json(user1, {"type": "unknown_action"}) resp = await recv_json(user1) log("WS", f"error response: {resp}") assert resp["type"] == "error" log("TEST", "--- Test 8: Leave Group ---") await send_json(user1, {"type": "leave", "sessionId": group_id}) await user1.close() await user2.close() log("TEST", "=== WebSocket Chat Test PASSED ===") async def main(): try: await test_ws() return 0 except Exception as e: log("FAIL", f"Test failed: {e}") import traceback log("FAIL", traceback.format_exc()) return 1 if __name__ == "__main__": rc = asyncio.run(main()) with open("logs/ws_test.log", "w") as f: f.write("\n".join(RESULTS) + "\n") sys.exit(rc)