fix: 更新接口测试脚本
对齐最新接口流程,改用内置管理员账号与网关验证码链路。
This commit is contained in:
+63
-129
@@ -3,7 +3,7 @@
|
|||||||
|
|
||||||
Runs against the local dev docker-compose environment.
|
Runs against the local dev docker-compose environment.
|
||||||
Gateway: http://127.0.0.1:18080
|
Gateway: http://127.0.0.1:18080
|
||||||
Direct service ports used ONLY for setup bypass (admin creation, vcode).
|
Reads verification codes from local Redis only for dev registration flow.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
@@ -15,10 +15,13 @@ import urllib.request
|
|||||||
import urllib.error
|
import urllib.error
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
import http.cookiejar
|
import http.cookiejar
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
|
||||||
GATEWAY = "http://127.0.0.1:18080"
|
GATEWAY = "http://127.0.0.1:18080"
|
||||||
USERS_DIRECT = "http://127.0.0.1:18801"
|
ADMIN_USERNAME = os.getenv("ADMIN_USERNAME", "admin")
|
||||||
EMAIL_DIRECT = "http://127.0.0.1:18809"
|
ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin123")
|
||||||
|
REDIS_CONTAINER = os.getenv("REDIS_CONTAINER", "juwan-redis")
|
||||||
|
|
||||||
passed = 0
|
passed = 0
|
||||||
failed = 0
|
failed = 0
|
||||||
@@ -29,6 +32,26 @@ def rand_str(n=8):
|
|||||||
return "".join(random.choices(string.ascii_lowercase + string.digits, k=n))
|
return "".join(random.choices(string.ascii_lowercase + string.digits, k=n))
|
||||||
|
|
||||||
|
|
||||||
|
def read_vcode_from_redis(request_id, scene, account):
|
||||||
|
if not request_id:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
result = subprocess.run(
|
||||||
|
[
|
||||||
|
"docker",
|
||||||
|
"exec",
|
||||||
|
REDIS_CONTAINER,
|
||||||
|
"redis-cli",
|
||||||
|
"GET",
|
||||||
|
f"vcode:{request_id}:{scene}:{account}",
|
||||||
|
],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=5,
|
||||||
|
)
|
||||||
|
return result.stdout.strip()
|
||||||
|
|
||||||
|
|
||||||
class Session:
|
class Session:
|
||||||
"""Minimal cookie-aware HTTP session using stdlib only."""
|
"""Minimal cookie-aware HTTP session using stdlib only."""
|
||||||
|
|
||||||
@@ -120,40 +143,28 @@ def phase0_health(s: Session):
|
|||||||
|
|
||||||
|
|
||||||
# ============================================================
|
# ============================================================
|
||||||
# Phase 1: Registration (bypass vcode via direct email-api)
|
# Phase 1: Registration
|
||||||
# ============================================================
|
# ============================================================
|
||||||
def phase1_register(s: Session, username, email, password):
|
def phase1_register(s: Session, username, email, password):
|
||||||
print("\n=== Phase 1: Register ===")
|
print("\n=== Phase 1: Register ===")
|
||||||
|
|
||||||
# Step 1: send verification code via direct email-api (bypass)
|
# Step 1: send verification code via gateway
|
||||||
print(" [BYPASS] Sending verification code via direct email-api...")
|
|
||||||
code, body, _ = s.post(
|
code, body, _ = s.post(
|
||||||
f"{EMAIL_DIRECT}/api/v1/email/verification-code/send",
|
f"{GATEWAY}/api/v1/email/verification-code/send",
|
||||||
json_body={"email": email, "scene": "register"},
|
json_body={"email": email, "scene": "register"},
|
||||||
|
headers=s.csrf_headers(),
|
||||||
)
|
)
|
||||||
report("POST /email/verification-code/send (direct)", code, body)
|
report("POST /email/verification-code/send (gateway)", code, body)
|
||||||
request_id = body.get("requestId", "")
|
request_id = body.get("requestId", "")
|
||||||
if not request_id:
|
if not request_id:
|
||||||
print(" [ERROR] No requestId returned, cannot register")
|
print(" [ERROR] No requestId returned, cannot register")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Step 2: get the vcode from redis (we can't, so we bypass via direct users-api)
|
# Step 2: read the dev verification code from Redis
|
||||||
# Actually we need the real vcode. Let's try to register via direct users-api
|
|
||||||
# with the requestId header. But we still need the vcode...
|
|
||||||
# The vcode is stored in redis. Let's read it.
|
|
||||||
print(" [BYPASS] Reading vcode from Redis...")
|
print(" [BYPASS] Reading vcode from Redis...")
|
||||||
import subprocess
|
vcode = read_vcode_from_redis(request_id, "register", email)
|
||||||
|
|
||||||
redis_cmd = (
|
|
||||||
f'docker exec juwan-redis redis-cli GET "vcode:{request_id}:register:{email}"'
|
|
||||||
)
|
|
||||||
result = subprocess.run(
|
|
||||||
redis_cmd, shell=True, capture_output=True, text=True, timeout=5
|
|
||||||
)
|
|
||||||
vcode = result.stdout.strip()
|
|
||||||
if not vcode:
|
if not vcode:
|
||||||
print(f" [WARN] Could not read vcode from redis, trying with dummy code")
|
print(" [WARN] No verification code found in Redis")
|
||||||
vcode = "000000"
|
|
||||||
else:
|
else:
|
||||||
print(f" Vcode from Redis: {vcode}")
|
print(f" Vcode from Redis: {vcode}")
|
||||||
|
|
||||||
@@ -224,57 +235,15 @@ def phase2_user(s: Session, user_id):
|
|||||||
|
|
||||||
|
|
||||||
# ============================================================
|
# ============================================================
|
||||||
# Phase 3: Admin setup (bypass) + Verification flow
|
# Phase 3: Built-in Admin + Verification flow
|
||||||
# ============================================================
|
# ============================================================
|
||||||
def phase3_admin_and_verification(
|
def phase3_admin_and_verification(
|
||||||
s_admin: Session,
|
s_admin: Session,
|
||||||
s_user: Session,
|
s_user: Session,
|
||||||
admin_user,
|
admin_user,
|
||||||
admin_pass,
|
admin_pass,
|
||||||
admin_email,
|
|
||||||
admin_id_hint=0,
|
|
||||||
user_name_hint="",
|
|
||||||
):
|
):
|
||||||
print("\n=== Phase 3: Admin Setup & Verification ===")
|
print("\n=== Phase 3: Built-in Admin & Verification ===")
|
||||||
|
|
||||||
# Register admin via direct (bypass)
|
|
||||||
print(" [BYPASS] Registering admin via direct users-api...")
|
|
||||||
admin_s_direct = Session()
|
|
||||||
code, body, _ = admin_s_direct.post(
|
|
||||||
f"{EMAIL_DIRECT}/api/v1/email/verification-code/send",
|
|
||||||
json_body={"email": admin_email, "scene": "register"},
|
|
||||||
)
|
|
||||||
request_id = body.get("requestId", "")
|
|
||||||
|
|
||||||
import subprocess
|
|
||||||
|
|
||||||
redis_cmd = f'docker exec juwan-redis redis-cli GET "vcode:{request_id}:register:{admin_email}"'
|
|
||||||
result = subprocess.run(
|
|
||||||
redis_cmd, shell=True, capture_output=True, text=True, timeout=5
|
|
||||||
)
|
|
||||||
vcode = result.stdout.strip() or "000000"
|
|
||||||
|
|
||||||
code, body, _ = admin_s_direct.post(
|
|
||||||
f"{USERS_DIRECT}/api/v1/auth/register",
|
|
||||||
json_body={
|
|
||||||
"username": admin_user,
|
|
||||||
"email": admin_email,
|
|
||||||
"password": admin_pass,
|
|
||||||
"vcode": vcode,
|
|
||||||
},
|
|
||||||
headers={"X-Request-Id": request_id},
|
|
||||||
)
|
|
||||||
report("Register admin (direct bypass)", code, body)
|
|
||||||
if isinstance(body.get("user"), dict):
|
|
||||||
admin_id_hint = body["user"].get("id", admin_id_hint)
|
|
||||||
|
|
||||||
# Set admin flag via DB
|
|
||||||
print(" [BYPASS] Setting is_admin=true via PostgreSQL...")
|
|
||||||
db_cmd = (
|
|
||||||
f"docker exec juwan-postgres psql -U postgres -d app -c "
|
|
||||||
f"\"UPDATE users SET is_admin=true WHERE username='{admin_user}';\""
|
|
||||||
)
|
|
||||||
subprocess.run(db_cmd, shell=True, capture_output=True, text=True, timeout=5)
|
|
||||||
|
|
||||||
# Login admin via gateway
|
# Login admin via gateway
|
||||||
s_admin.get(f"{GATEWAY}/healthz") # get CSRF
|
s_admin.get(f"{GATEWAY}/healthz") # get CSRF
|
||||||
@@ -284,7 +253,15 @@ def phase3_admin_and_verification(
|
|||||||
json_body={"username": admin_user, "password": admin_pass},
|
json_body={"username": admin_user, "password": admin_pass},
|
||||||
headers=csrf,
|
headers=csrf,
|
||||||
)
|
)
|
||||||
report("Admin login (gateway)", code, body)
|
report("POST /auth/login (admin)", code, body)
|
||||||
|
|
||||||
|
admin_id = 0
|
||||||
|
code, body, _ = s_admin.get(f"{GATEWAY}/api/v1/users/me")
|
||||||
|
report("GET /users/me (admin)", code, body)
|
||||||
|
if isinstance(body.get("user"), dict):
|
||||||
|
admin_id = body["user"].get("id", admin_id)
|
||||||
|
elif body.get("id"):
|
||||||
|
admin_id = body.get("id", admin_id)
|
||||||
|
|
||||||
# User applies for player verification
|
# User applies for player verification
|
||||||
csrf_user = s_user.csrf_headers()
|
csrf_user = s_user.csrf_headers()
|
||||||
@@ -323,52 +300,28 @@ def phase3_admin_and_verification(
|
|||||||
code, body, _ = s_user.get(f"{GATEWAY}/api/v1/users/me/verification")
|
code, body, _ = s_user.get(f"{GATEWAY}/api/v1/users/me/verification")
|
||||||
report("GET /users/me/verification", code, body)
|
report("GET /users/me/verification", code, body)
|
||||||
|
|
||||||
# Admin: list pending verifications (direct bypass - envoy missing /api/v1/admin route)
|
# Admin: list and approve via gateway
|
||||||
code, body, _ = s_admin.get(
|
code, body, _ = s_admin.get(f"{GATEWAY}/api/v1/admin/verifications")
|
||||||
f"{USERS_DIRECT}/api/v1/admin/verifications",
|
report("GET /admin/verifications", code, body)
|
||||||
headers={"x-auth-user-id": str(admin_id_hint), "x-auth-is-admin": "true"},
|
|
||||||
)
|
|
||||||
report("GET /admin/verifications (direct bypass)", code, body)
|
|
||||||
|
|
||||||
verification_ids = []
|
verification_ids = []
|
||||||
if isinstance(body.get("list"), list):
|
if isinstance(body.get("list"), list):
|
||||||
for v in body["list"]:
|
for v in body["list"]:
|
||||||
verification_ids.append((v.get("id"), v.get("role")))
|
verification_ids.append((v.get("id"), v.get("role")))
|
||||||
|
|
||||||
# Admin: approve all (direct bypass)
|
# Admin: approve all
|
||||||
for vid, role in verification_ids:
|
for vid, role in verification_ids:
|
||||||
code, body, _ = s_admin.post(
|
code, body, _ = s_admin.post(
|
||||||
f"{USERS_DIRECT}/api/v1/admin/verifications/{vid}/approve",
|
f"{GATEWAY}/api/v1/admin/verifications/{vid}/approve",
|
||||||
json_body={},
|
json_body={},
|
||||||
headers={"x-auth-user-id": str(admin_id_hint), "x-auth-is-admin": "true"},
|
headers=s_admin.csrf_headers(),
|
||||||
)
|
)
|
||||||
report(
|
report(
|
||||||
f"POST /admin/verifications/{vid}/approve ({role}) (direct bypass)",
|
f"POST /admin/verifications/{vid}/approve ({role})",
|
||||||
code,
|
code,
|
||||||
body,
|
body,
|
||||||
)
|
)
|
||||||
|
|
||||||
# BUG WORKAROUND: SearchUserVerifications filters by user_id=0 (bug in RPC),
|
|
||||||
# so admin sees empty list. Force-approve via DB to unblock downstream tests.
|
|
||||||
import subprocess as _sp
|
|
||||||
|
|
||||||
_sp.run(
|
|
||||||
f'''docker exec juwan-postgres psql -U postgres -d app -c "UPDATE user_verifications SET status='approved', reviewed_at=NOW() WHERE status='pending';"''',
|
|
||||||
shell=True,
|
|
||||||
capture_output=True,
|
|
||||||
text=True,
|
|
||||||
timeout=5,
|
|
||||||
)
|
|
||||||
# Also update user's verified_roles to include player and owner
|
|
||||||
_sp.run(
|
|
||||||
f'''docker exec juwan-postgres psql -U postgres -d app -c "UPDATE users SET verified_roles='{{consumer,player,owner}}' WHERE username='{user_name_hint}';"''',
|
|
||||||
shell=True,
|
|
||||||
capture_output=True,
|
|
||||||
text=True,
|
|
||||||
timeout=5,
|
|
||||||
)
|
|
||||||
print(" [BYPASS] Force-approved verifications and updated verified_roles via DB")
|
|
||||||
|
|
||||||
# User: switch role to player
|
# User: switch role to player
|
||||||
code, body, _ = s_user.post(
|
code, body, _ = s_user.post(
|
||||||
f"{GATEWAY}/api/v1/users/me/switch-role",
|
f"{GATEWAY}/api/v1/users/me/switch-role",
|
||||||
@@ -377,7 +330,7 @@ def phase3_admin_and_verification(
|
|||||||
)
|
)
|
||||||
report("POST /users/me/switch-role (player)", code, body)
|
report("POST /users/me/switch-role (player)", code, body)
|
||||||
|
|
||||||
return verification_ids
|
return admin_id, verification_ids
|
||||||
|
|
||||||
|
|
||||||
def phase4_follow(s: Session, target_user_id):
|
def phase4_follow(s: Session, target_user_id):
|
||||||
@@ -854,9 +807,8 @@ def main():
|
|||||||
user1_name = f"testuser_{suffix}"
|
user1_name = f"testuser_{suffix}"
|
||||||
user1_email = f"testuser_{suffix}@example.com"
|
user1_email = f"testuser_{suffix}@example.com"
|
||||||
user1_pass = "TestPass123!"
|
user1_pass = "TestPass123!"
|
||||||
admin_name = f"admin_{suffix}"
|
admin_name = ADMIN_USERNAME
|
||||||
admin_email = f"admin_{suffix}@example.com"
|
admin_pass = ADMIN_PASSWORD
|
||||||
admin_pass = "AdminPass123!"
|
|
||||||
|
|
||||||
print(f"Test run: user={user1_name}, admin={admin_name}")
|
print(f"Test run: user={user1_name}, admin={admin_name}")
|
||||||
|
|
||||||
@@ -874,29 +826,13 @@ def main():
|
|||||||
|
|
||||||
phase2_user(s_user, user_id)
|
phase2_user(s_user, user_id)
|
||||||
|
|
||||||
phase3_admin_and_verification(
|
admin_id, _ = phase3_admin_and_verification(
|
||||||
s_admin,
|
s_admin,
|
||||||
s_user,
|
s_user,
|
||||||
admin_name,
|
admin_name,
|
||||||
admin_pass,
|
admin_pass,
|
||||||
admin_email,
|
|
||||||
user_name_hint=user1_name,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
admin_id = 0
|
|
||||||
s_admin_check = Session()
|
|
||||||
s_admin_check.get(f"{GATEWAY}/healthz")
|
|
||||||
s_admin_check.post(
|
|
||||||
f"{GATEWAY}/api/v1/auth/login",
|
|
||||||
json_body={"username": admin_name, "password": admin_pass},
|
|
||||||
headers=s_admin_check.csrf_headers(),
|
|
||||||
)
|
|
||||||
code, body, _ = s_admin_check.get(f"{GATEWAY}/api/v1/users/me")
|
|
||||||
if isinstance(body.get("user"), dict):
|
|
||||||
admin_id = body["user"].get("id", body.get("id", 0))
|
|
||||||
elif body.get("id"):
|
|
||||||
admin_id = body["id"]
|
|
||||||
|
|
||||||
if admin_id and user_id:
|
if admin_id and user_id:
|
||||||
phase4_follow(s_user, admin_id)
|
phase4_follow(s_user, admin_id)
|
||||||
|
|
||||||
@@ -916,19 +852,17 @@ def main():
|
|||||||
consumer_pass = "ConsumerPass123!"
|
consumer_pass = "ConsumerPass123!"
|
||||||
s_consumer.get(f"{GATEWAY}/healthz")
|
s_consumer.get(f"{GATEWAY}/healthz")
|
||||||
|
|
||||||
c_direct = Session()
|
code, body, _ = s_consumer.post(
|
||||||
code, body, _ = c_direct.post(
|
f"{GATEWAY}/api/v1/email/verification-code/send",
|
||||||
f"{EMAIL_DIRECT}/api/v1/email/verification-code/send",
|
|
||||||
json_body={"email": consumer_email, "scene": "register"},
|
json_body={"email": consumer_email, "scene": "register"},
|
||||||
|
headers=s_consumer.csrf_headers(),
|
||||||
)
|
)
|
||||||
|
report("POST /email/verification-code/send (consumer)", code, body)
|
||||||
c_request_id = body.get("requestId", "")
|
c_request_id = body.get("requestId", "")
|
||||||
import subprocess
|
|
||||||
|
|
||||||
redis_cmd = f'docker exec juwan-redis redis-cli GET "vcode:{c_request_id}:register:{consumer_email}"'
|
c_vcode = read_vcode_from_redis(c_request_id, "register", consumer_email)
|
||||||
result = subprocess.run(
|
if not c_vcode:
|
||||||
redis_cmd, shell=True, capture_output=True, text=True, timeout=5
|
print(" [WARN] No consumer verification code found in Redis")
|
||||||
)
|
|
||||||
c_vcode = result.stdout.strip() or "000000"
|
|
||||||
|
|
||||||
csrf_c = s_consumer.csrf_headers()
|
csrf_c = s_consumer.csrf_headers()
|
||||||
csrf_c["X-Request-Id"] = c_request_id
|
csrf_c["X-Request-Id"] = c_request_id
|
||||||
|
|||||||
Reference in New Issue
Block a user