Files
juwan-backend/app/chat/test/test_ws.py
T
zetaloop 5348966633 fix: 调整 chat WS/WT dev 接入
WT 目前沿用 JToken 的 JWT 校验;撤销一致性留到后续 WT 专用网关设计。
2026-04-25 06:54:00 +08:00

126 lines
4.2 KiB
Python

#!/usr/bin/env python3
"""WebSocket chat test — group chat + DM flows."""
import asyncio
import json
import sys
import time
try:
import websockets
except ImportError:
print("installing websockets...")
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "websockets", "-q"])
import websockets
WS_URL = "ws://localhost:28888/ws/chat"
RESULTS = []
def log(tag, msg):
ts = time.strftime("%H:%M:%S")
line = f"[{ts}] [{tag}] {msg}"
print(line)
RESULTS.append(line)
async def recv_json(ws, timeout=5):
raw = await asyncio.wait_for(ws.recv(), timeout=timeout)
return json.loads(raw)
async def send_json(ws, data):
await ws.send(json.dumps(data))
async def test_ws():
log("TEST", "=== WebSocket Chat Test Start ===")
log("WS", "connecting user1...")
user1 = await websockets.connect(WS_URL, additional_headers={"x-auth-user-id": "1001"})
resp = await recv_json(user1)
log("WS", f"user1 connected: {resp}")
assert resp["type"] == "connected", f"expected connected, got {resp['type']}"
log("WS", "connecting user2...")
user2 = await websockets.connect(WS_URL, additional_headers={"x-auth-user-id": "1002"})
resp = await recv_json(user2)
log("WS", f"user2 connected: {resp}")
assert resp["type"] == "connected"
log("TEST", "--- Test 1: Create Group ---")
await send_json(user1, {"type": "create_group", "name": "test-room"})
resp = await recv_json(user1)
log("WS", f"create_group response: {resp}")
assert resp["type"] == "group_created", f"expected group_created, got {resp['type']}"
group_id = resp["sessionId"]
log("TEST", f"group created with id={group_id}")
log("TEST", "--- Test 2: Create DM ---")
await send_json(user1, {"type": "create_dm", "targetId": 1002})
resp = await recv_json(user1)
log("WS", f"create_dm response: {resp}")
assert resp["type"] == "dm_created", f"expected dm_created, got {resp['type']}"
dm_id = resp["sessionId"]
log("TEST", f"DM created with id={dm_id}")
log("TEST", "--- Test 3: Join Group ---")
await send_json(user1, {"type": "join", "sessionId": group_id})
msgs = []
for _ in range(2):
try:
r = await recv_json(user1, timeout=3)
msgs.append(r)
log("WS", f"join msg: {r}")
except asyncio.TimeoutError:
break
types = {m["type"] for m in msgs}
assert "joined" in types, f"expected 'joined' in {types}"
log("TEST", f"join received types: {types}")
log("TEST", "--- Test 4: Send Message in Group ---")
await send_json(user1, {"type": "message", "sessionId": group_id, "content": "hello group!"})
resp = await recv_json(user1)
log("WS", f"message broadcast: {resp}")
assert resp["type"] == "message", f"expected message, got {resp['type']}"
assert resp["content"] == "hello group!"
log("TEST", "--- Test 5: Send DM ---")
await send_json(user1, {"type": "message", "sessionId": dm_id, "content": "hello DM!"})
resp = await recv_json(user1)
log("WS", f"DM message: {resp}")
assert resp["type"] == "message"
assert resp["content"] == "hello DM!"
log("TEST", "--- Test 6: Message History ---")
await send_json(user1, {"type": "history", "sessionId": group_id})
resp = await recv_json(user1)
log("WS", f"history response: type={resp['type']} data_len={len(resp.get('data', []))}")
assert resp["type"] == "history"
log("TEST", "--- Test 7: Invalid Message ---")
await send_json(user1, {"type": "unknown_action"})
resp = await recv_json(user1)
log("WS", f"error response: {resp}")
assert resp["type"] == "error"
log("TEST", "--- Test 8: Leave Group ---")
await send_json(user1, {"type": "leave", "sessionId": group_id})
await user1.close()
await user2.close()
log("TEST", "=== WebSocket Chat Test PASSED ===")
async def main():
try:
await test_ws()
return 0
except Exception as e:
log("FAIL", f"Test failed: {e}")
import traceback
log("FAIL", traceback.format_exc())
return 1
if __name__ == "__main__":
rc = asyncio.run(main())
with open("logs/ws_test.log", "w") as f:
f.write("\n".join(RESULTS) + "\n")
sys.exit(rc)