fix: 调整 chat WS/WT dev 接入

WT 目前沿用 JToken 的 JWT 校验;撤销一致性留到后续 WT 专用网关设计。
This commit is contained in:
zetaloop
2026-04-25 06:54:00 +08:00
parent dd3cd24b70
commit 5348966633
14 changed files with 95 additions and 102 deletions
+6 -1
View File
@@ -7,7 +7,7 @@ Hybrid:
Protocol: auto Protocol: auto
Ws: Ws:
Name: chat-ws Name: chat-ws
Addr: :8889 Addr: :8888
Path: /ws/chat Path: /ws/chat
MaxConnections: 10000 MaxConnections: 10000
Auth: Auth:
@@ -19,12 +19,17 @@ Hybrid:
Path: /wt/chat Path: /wt/chat
CertFile: /etc/certs/tls.crt CertFile: /etc/certs/tls.crt
KeyFile: /etc/certs/tls.key KeyFile: /etc/certs/tls.key
Auth:
Enabled: true
FallbackStrategy: auto FallbackStrategy: auto
MaxRetries: 3 MaxRetries: 3
MaxConnections: 10000 MaxConnections: 10000
Auth: Auth:
Enabled: true Enabled: true
WsHeaderName: x-auth-user-id WsHeaderName: x-auth-user-id
WtTokenSource: cookie
WtTokenName: JToken
WtJWTSecret: MGUyMWE3ZDhjMTQ5ZDg1MWViOWU0MGM3OTE2NWVkYTBlOTE5ZWRkZDU1YjYzOGJjOWRiNzM0NTc4NDIyMjlkZQ
Stateless: Stateless:
PollInterval: 100ms PollInterval: 100ms
@@ -64,6 +64,12 @@ func (h *Handler) handleJoin(conn protocol.Connection, msg *WsMessage) error {
func (h *Handler) handleLeave(conn protocol.Connection, msg *WsMessage) error { func (h *Handler) handleLeave(conn protocol.Connection, msg *WsMessage) error {
uid := h.getUserId(conn) uid := h.getUserId(conn)
if uid <= 0 {
return conn.SendJSON(context.Background(), WsResponse{
Type: "error",
Content: "authentication required",
})
}
sessionId := msg.SessionId sessionId := msg.SessionId
if sessionId <= 0 { if sessionId <= 0 {
if sid, ok := conn.Metadata()["sessionId"].(int64); ok { if sid, ok := conn.Metadata()["sessionId"].(int64); ok {
@@ -73,6 +79,12 @@ func (h *Handler) handleLeave(conn protocol.Connection, msg *WsMessage) error {
if sessionId <= 0 { if sessionId <= 0 {
return nil return nil
} }
if !h.svcCtx.Store.IsParticipant(sessionId, uid) {
return conn.SendJSON(context.Background(), WsResponse{
Type: "error",
Content: "not a member of this session",
})
}
session, err := h.svcCtx.Store.GetSession(sessionId) session, err := h.svcCtx.Store.GetSession(sessionId)
if err == nil { if err == nil {
@@ -108,6 +120,12 @@ func (h *Handler) handleMessage(conn protocol.Connection, msg *WsMessage) error
Content: "sessionId is required, join a session first", Content: "sessionId is required, join a session first",
}) })
} }
if !h.svcCtx.Store.IsParticipant(sessionId, uid) {
return conn.SendJSON(context.Background(), WsResponse{
Type: "error",
Content: "not a member of this session",
})
}
msgType := chatcore.MessageType(msg.MsgType) msgType := chatcore.MessageType(msg.MsgType)
if msgType == "" { if msgType == "" {
@@ -151,12 +169,25 @@ func (h *Handler) handleMessage(conn protocol.Connection, msg *WsMessage) error
} }
func (h *Handler) handleHistory(conn protocol.Connection, msg *WsMessage) error { func (h *Handler) handleHistory(conn protocol.Connection, msg *WsMessage) error {
uid := h.getUserId(conn)
if uid <= 0 {
return conn.SendJSON(context.Background(), WsResponse{
Type: "error",
Content: "authentication required",
})
}
if msg.SessionId <= 0 { if msg.SessionId <= 0 {
return conn.SendJSON(context.Background(), WsResponse{ return conn.SendJSON(context.Background(), WsResponse{
Type: "error", Type: "error",
Content: "sessionId is required", Content: "sessionId is required",
}) })
} }
if !h.svcCtx.Store.IsParticipant(msg.SessionId, uid) {
return conn.SendJSON(context.Background(), WsResponse{
Type: "error",
Content: "not a member of this session",
})
}
messages := h.svcCtx.Store.GetMessages(msg.SessionId, 0, 50) messages := h.svcCtx.Store.GetMessages(msg.SessionId, 0, 50)
+16
View File
@@ -109,6 +109,22 @@ func (s *Store) GetSession(id int64) (*Session, error) {
return session, nil return session, nil
} }
func (s *Store) IsParticipant(sessionId, userId int64) bool {
s.mu.RLock()
defer s.mu.RUnlock()
session, ok := s.Sessions[sessionId]
if !ok {
return false
}
for _, p := range session.Participants {
if p == userId {
return true
}
}
return false
}
func (s *Store) ListUserSessions(userId int64, page, limit int) []*Session { func (s *Store) ListUserSessions(userId int64, page, limit int) []*Session {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
+1 -1
View File
@@ -7,7 +7,7 @@ Hybrid:
Protocol: auto Protocol: auto
Ws: Ws:
Name: chat-ws Name: chat-ws
Addr: :28889 Addr: :28888
Path: /ws/chat Path: /ws/chat
MaxConnections: 10000 MaxConnections: 10000
Auth: Auth:
+3 -1
View File
@@ -7,7 +7,7 @@ Hybrid:
Protocol: auto Protocol: auto
Ws: Ws:
Name: chat-ws Name: chat-ws
Addr: :8889 Addr: :8888
Path: /ws/chat Path: /ws/chat
MaxConnections: 10000 MaxConnections: 10000
Auth: Auth:
@@ -19,6 +19,8 @@ Hybrid:
Path: /wt/chat Path: /wt/chat
CertFile: /etc/certs/tls.crt CertFile: /etc/certs/tls.crt
KeyFile: /etc/certs/tls.key KeyFile: /etc/certs/tls.key
Auth:
Enabled: true
FallbackStrategy: auto FallbackStrategy: auto
MaxRetries: 3 MaxRetries: 3
MaxConnections: 10000 MaxConnections: 10000
-1
View File
@@ -6,7 +6,6 @@ services:
container_name: chat-api-test container_name: chat-api-test
ports: ports:
- "28888:8888" - "28888:8888"
- "28889:8889"
- "28443:8443/udp" - "28443:8443/udp"
volumes: volumes:
- ./certs:/etc/certs:ro - ./certs:/etc/certs:ro
+1 -1
View File
@@ -14,7 +14,7 @@ except ImportError:
subprocess.check_call([sys.executable, "-m", "pip", "install", "websockets", "-q"]) subprocess.check_call([sys.executable, "-m", "pip", "install", "websockets", "-q"])
import websockets import websockets
WS_URL = "ws://localhost:28889/ws/chat" WS_URL = "ws://localhost:28888/ws/chat"
RESULTS = [] RESULTS = []
def log(tag, msg): def log(tag, msg):
+1 -3
View File
@@ -5,8 +5,6 @@ import asyncio
import json import json
import sys import sys
import time import time
import urllib.request
import urllib.error
try: try:
import websockets import websockets
@@ -15,7 +13,7 @@ except ImportError:
subprocess.check_call([sys.executable, "-m", "pip", "install", "websockets", "-q"]) subprocess.check_call([sys.executable, "-m", "pip", "install", "websockets", "-q"])
import websockets import websockets
WS_URL = "ws://localhost:28889/ws/chat" WS_URL = "ws://localhost:28888/ws/chat"
API_BASE = "http://localhost:28888" API_BASE = "http://localhost:28888"
RESULTS = [] RESULTS = []
+1
View File
@@ -67,6 +67,7 @@ func (m *JwtManager) New(ctx context.Context, payload *TokenPayload) (string, er
ExpiresAt: jwt.NewNumericDate(expiresAt), ExpiresAt: jwt.NewNumericDate(expiresAt),
IssuedAt: jwt.NewNumericDate(now), IssuedAt: jwt.NewNumericDate(now),
Issuer: m.issuer, Issuer: m.issuer,
Subject: strconv.FormatInt(payload.UserId, 10),
}, },
} }
+4 -2
View File
@@ -31,6 +31,8 @@ docker compose down
端到端接口测试走网关 `http://127.0.0.1:18080``18801-18814` 是各服务的直连端口,不经过认证链路。 端到端接口测试走网关 `http://127.0.0.1:18080``18801-18814` 是各服务的直连端口,不经过认证链路。
Chat WebSocket 通过网关 `ws://127.0.0.1:18080/ws/chat` 访问。WebTransport 使用 `18443/udp``/wt/chat` 入口。
如需只启动部分服务: 如需只启动部分服务:
```bash ```bash
@@ -40,7 +42,7 @@ docker compose up -d postgres redis snowflake player-rpc player-api
## 端口映射 ## 端口映射
| 服务 | 宿主机端口 | | 服务 | 宿主机端口 |
| ---------------- | ---------- | | ---------------- | ---------------- |
| PostgreSQL | 15432 | | PostgreSQL | 15432 |
| Redis | 16379 | | Redis | 16379 |
| Kafka | 19092 | | Kafka | 19092 |
@@ -54,7 +56,7 @@ docker compose up -d postgres redis snowflake player-rpc player-api
| community-api | 18807 | | community-api | 18807 |
| objectstory-api | 18808 | | objectstory-api | 18808 |
| email-api | 18809 | | email-api | 18809 |
| chat-api | 18810 | | chat-api | 18810, 18443/udp |
| review-api | 18811 | | review-api | 18811 |
| dispute-api | 18812 | | dispute-api | 18812 |
| notification-api | 18813 | | notification-api | 18813 |
+4 -1
View File
@@ -444,7 +444,6 @@ services:
restart: unless-stopped restart: unless-stopped
ports: ports:
- "18810:8888" - "18810:8888"
- "18889:8889"
- "18443:8443/udp" - "18443:8443/udp"
volumes: volumes:
- ./certs:/etc/certs:ro - ./certs:/etc/certs:ro
@@ -464,6 +463,8 @@ services:
condition: service_started condition: service_started
order-rpc: order-rpc:
condition: service_started condition: service_started
player-rpc:
condition: service_started
dispute-api: dispute-api:
image: juwan/dispute-api:dev image: juwan/dispute-api:dev
@@ -477,6 +478,8 @@ services:
condition: service_started condition: service_started
order-rpc: order-rpc:
condition: service_started condition: service_started
player-rpc:
condition: service_started
notification-api: notification-api:
image: juwan/notification-api:dev image: juwan/notification-api:dev
+8 -2
View File
@@ -375,10 +375,12 @@ static_resources:
timeout: 30s timeout: 30s
- match: - match:
prefix: /api/v1/chat path: /ws/chat
route: route:
cluster: chat_api_cluster cluster: chat_api_cluster
timeout: 30s timeout: 0s
upgrade_configs:
- upgrade_type: websocket
- match: - match:
path: /api/v1/upload path: /api/v1/upload
@@ -597,6 +599,10 @@ static_resources:
rules: rules:
- match: - match:
path: /healthz path: /healthz
- match:
path: /ws/chat
requires:
provider_name: juwan_user_jwt
- match: - match:
prefix: /api/v1 prefix: /api/v1
headers: headers:
-70
View File
@@ -1,70 +0,0 @@
syntax = "v1"
import "common.api"
type (
SessionIdReq {
Id int64 `path:"id"`
}
CreateGroupReq {
Name string `json:"name"`
Participants []int64 `json:"participants,optional"`
}
CreateDMReq {
TargetId int64 `json:"targetId"`
}
ChatSession {
Id int64 `json:"id"`
Type string `json:"type"`
Name string `json:"name"`
CreatorId int64 `json:"creatorId"`
Participants []int64 `json:"participants"`
LastMessage string `json:"lastMessage"`
LastMessageAt int64 `json:"lastMessageAt"`
CreatedAt int64 `json:"createdAt"`
}
ChatSessionListResp {
Items []ChatSession `json:"items"`
}
ChatMessage {
Id int64 `json:"id"`
SessionId int64 `json:"sessionId"`
SenderId int64 `json:"senderId"`
Type string `json:"type"`
Content string `json:"content"`
CreatedAt int64 `json:"createdAt"`
}
ChatMessageListResp {
Items []ChatMessage `json:"items"`
}
ListMessageReq {
SessionIdReq
PageReq
}
)
@server (
prefix: api/v1/chat
group: chat
)
service chat-api {
@doc "create group session"
@handler CreateGroup
post /sessions/group (CreateGroupReq) returns (ChatSession)
@doc "create dm session"
@handler CreateDM
post /sessions/dm (CreateDMReq) returns (ChatSession)
@doc "list user sessions"
@handler ListSessions
get /sessions (PageReq) returns (ChatSessionListResp)
@doc "get session detail"
@handler GetSession
get /sessions/:id (SessionIdReq) returns (ChatSession)
@doc "get message history"
@handler ListMessages
get /sessions/:id/messages (ListMessageReq) returns (ChatMessageListResp)
}
+1 -1
View File
@@ -14,7 +14,7 @@ Juwan 是一个基于 Go-Zero 微服务框架的分布式后端系统,采用
│ │ │ │
┌───▼────────┐ ┌───▼────────┐ ┌───▼────────┐ ┌───▼────────┐
│ User API │ │ Order API │ │ User API │ │ Order API │
│ (8888) │ │ (8889) │ │ (8888) │ │ (8888) │
└───┬────────┘ └────────────┘ └───┬────────┘ └────────────┘
┌───▼────────────────────┐ ┌───▼────────────────────┐