add: chat service
This commit is contained in:
@@ -0,0 +1,143 @@
|
||||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"juwan-backend/app/chat/api/internal/svc"
|
||||
|
||||
"github.com/wwweww/go-wst/hybrid"
|
||||
"github.com/wwweww/go-wst/protocol"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
type WsMessage struct {
|
||||
Type string `json:"type"`
|
||||
SessionId int64 `json:"sessionId,omitempty"`
|
||||
TargetId int64 `json:"targetId,omitempty"`
|
||||
Content string `json:"content,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
MsgType string `json:"msgType,omitempty"`
|
||||
}
|
||||
|
||||
type WsResponse struct {
|
||||
Type string `json:"type"`
|
||||
SessionId int64 `json:"sessionId,omitempty"`
|
||||
SenderId int64 `json:"senderId,omitempty"`
|
||||
Content string `json:"content,omitempty"`
|
||||
Data interface{} `json:"data,omitempty"`
|
||||
}
|
||||
|
||||
type Handler struct {
|
||||
svcCtx *svc.ServiceContext
|
||||
server *hybrid.Server
|
||||
}
|
||||
|
||||
var _ protocol.StatefulHandler = (*Handler)(nil)
|
||||
var _ protocol.StatelessHandler = (*Handler)(nil)
|
||||
|
||||
func NewHandler(svcCtx *svc.ServiceContext) *Handler {
|
||||
return &Handler{
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) SetServer(s *hybrid.Server) {
|
||||
h.server = s
|
||||
}
|
||||
|
||||
func (h *Handler) OnConnect(conn protocol.Connection) error {
|
||||
logx.Infof("chat connected: id=%s userID=%s protocol=%s", conn.ID(), conn.UserID(), conn.Protocol())
|
||||
if uid := conn.UserID(); uid != "" {
|
||||
h.server.BindUser(conn, uid)
|
||||
}
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "connected",
|
||||
Content: "chat service connected",
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) OnMessage(conn protocol.Connection, raw []byte) error {
|
||||
var msg WsMessage
|
||||
if err := json.Unmarshal(raw, &msg); err != nil {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: "invalid message format",
|
||||
})
|
||||
}
|
||||
|
||||
switch msg.Type {
|
||||
case "create_group":
|
||||
return h.handleCreateGroup(conn, &msg)
|
||||
case "create_dm":
|
||||
return h.handleCreateDM(conn, &msg)
|
||||
case "join":
|
||||
return h.handleJoin(conn, &msg)
|
||||
case "leave":
|
||||
return h.handleLeave(conn, &msg)
|
||||
case "message":
|
||||
return h.handleMessage(conn, &msg)
|
||||
case "history":
|
||||
return h.handleHistory(conn, &msg)
|
||||
default:
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: fmt.Sprintf("unknown message type: %s", msg.Type),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) OnDisconnect(conn protocol.Connection, err error) {
|
||||
logx.Infof("chat disconnected: userID=%s err=%v", conn.UserID(), err)
|
||||
}
|
||||
|
||||
func (h *Handler) Fetch(ctx context.Context, req protocol.FetchRequest) ([]protocol.Message, error) {
|
||||
return h.svcCtx.MsgStore.Fetch(ctx, req.UserID, req.SinceID, req.Limit)
|
||||
}
|
||||
|
||||
func (h *Handler) Send(ctx context.Context, req protocol.SendRequest) error {
|
||||
msg := protocol.Message{
|
||||
Type: "message",
|
||||
Topic: req.Topic,
|
||||
Data: req.Data,
|
||||
}
|
||||
return h.svcCtx.MsgStore.Store(ctx, req.UserID, msg)
|
||||
}
|
||||
|
||||
func (h *Handler) getUserId(conn protocol.Connection) int64 {
|
||||
uid, _ := strconv.ParseInt(conn.UserID(), 10, 64)
|
||||
return uid
|
||||
}
|
||||
|
||||
func (h *Handler) broadcastToParticipants(participants []int64, resp WsResponse) {
|
||||
data, err := json.Marshal(resp)
|
||||
if err != nil {
|
||||
logx.Errorf("marshal error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
userIDs := make([]string, len(participants))
|
||||
for i, p := range participants {
|
||||
userIDs[i] = strconv.FormatInt(p, 10)
|
||||
}
|
||||
|
||||
if err := h.server.BroadcastTo(userIDs, data); err != nil {
|
||||
logx.Errorf("broadcastTo failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) storeOfflineMessage(userID string, resp WsResponse) {
|
||||
data, err := json.Marshal(resp)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
msg := protocol.Message{
|
||||
Type: "chat",
|
||||
Data: data,
|
||||
}
|
||||
if storeErr := h.svcCtx.MsgStore.Store(context.Background(), userID, msg); storeErr != nil {
|
||||
logx.Errorf("store offline msg for %s failed: %v", userID, storeErr)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,168 @@
|
||||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"juwan-backend/app/chat/chatcore"
|
||||
|
||||
"github.com/wwweww/go-wst/protocol"
|
||||
)
|
||||
|
||||
func (h *Handler) handleJoin(conn protocol.Connection, msg *WsMessage) error {
|
||||
uid := h.getUserId(conn)
|
||||
if uid <= 0 {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: "authentication required",
|
||||
})
|
||||
}
|
||||
if msg.SessionId <= 0 {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: "sessionId is required",
|
||||
})
|
||||
}
|
||||
|
||||
session, err := h.svcCtx.Store.GetSession(msg.SessionId)
|
||||
if err != nil {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: "session not found",
|
||||
})
|
||||
}
|
||||
|
||||
isMember := false
|
||||
for _, p := range session.Participants {
|
||||
if p == uid {
|
||||
isMember = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !isMember {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: "not a member of this session",
|
||||
})
|
||||
}
|
||||
|
||||
conn.SetMetadata("sessionId", msg.SessionId)
|
||||
|
||||
h.broadcastToParticipants(session.Participants, WsResponse{
|
||||
Type: "user_joined",
|
||||
SessionId: msg.SessionId,
|
||||
SenderId: uid,
|
||||
Content: fmt.Sprintf("user %d joined", uid),
|
||||
})
|
||||
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "joined",
|
||||
SessionId: msg.SessionId,
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) handleLeave(conn protocol.Connection, msg *WsMessage) error {
|
||||
uid := h.getUserId(conn)
|
||||
sessionId := msg.SessionId
|
||||
if sessionId <= 0 {
|
||||
if sid, ok := conn.Metadata()["sessionId"].(int64); ok {
|
||||
sessionId = sid
|
||||
}
|
||||
}
|
||||
if sessionId <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
session, err := h.svcCtx.Store.GetSession(sessionId)
|
||||
if err == nil {
|
||||
h.broadcastToParticipants(session.Participants, WsResponse{
|
||||
Type: "user_left",
|
||||
SessionId: sessionId,
|
||||
SenderId: uid,
|
||||
Content: fmt.Sprintf("user %d left", uid),
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Handler) handleMessage(conn protocol.Connection, msg *WsMessage) error {
|
||||
uid := h.getUserId(conn)
|
||||
if uid <= 0 {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: "authentication required",
|
||||
})
|
||||
}
|
||||
|
||||
sessionId := msg.SessionId
|
||||
if sessionId <= 0 {
|
||||
if sid, ok := conn.Metadata()["sessionId"].(int64); ok {
|
||||
sessionId = sid
|
||||
}
|
||||
}
|
||||
if sessionId <= 0 {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: "sessionId is required, join a session first",
|
||||
})
|
||||
}
|
||||
|
||||
msgType := chatcore.MessageType(msg.MsgType)
|
||||
if msgType == "" {
|
||||
msgType = chatcore.MessageTypeText
|
||||
}
|
||||
|
||||
chatMsg, err := h.svcCtx.Store.AddMessage(sessionId, uid, msgType, msg.Content)
|
||||
if err != nil {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
session, err := h.svcCtx.Store.GetSession(sessionId)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
outMsg := WsResponse{
|
||||
Type: "message",
|
||||
SessionId: sessionId,
|
||||
SenderId: uid,
|
||||
Content: msg.Content,
|
||||
Data: map[string]int64{"messageId": chatMsg.Id},
|
||||
}
|
||||
|
||||
h.broadcastToParticipants(session.Participants, outMsg)
|
||||
|
||||
for _, p := range session.Participants {
|
||||
if p == uid {
|
||||
continue
|
||||
}
|
||||
userIdStr := strconv.FormatInt(p, 10)
|
||||
if _, online := h.server.GetConnection(userIdStr); !online {
|
||||
h.storeOfflineMessage(userIdStr, outMsg)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Handler) handleHistory(conn protocol.Connection, msg *WsMessage) error {
|
||||
if msg.SessionId <= 0 {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: "sessionId is required",
|
||||
})
|
||||
}
|
||||
|
||||
messages := h.svcCtx.Store.GetMessages(msg.SessionId, 0, 50)
|
||||
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "history",
|
||||
SessionId: msg.SessionId,
|
||||
Data: messages,
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,62 @@
|
||||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"juwan-backend/app/chat/chatcore"
|
||||
|
||||
"github.com/wwweww/go-wst/protocol"
|
||||
)
|
||||
|
||||
func (h *Handler) handleCreateGroup(conn protocol.Connection, msg *WsMessage) error {
|
||||
uid := h.getUserId(conn)
|
||||
if uid <= 0 {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: "authentication required",
|
||||
})
|
||||
}
|
||||
|
||||
session, err := h.svcCtx.Store.CreateSession(chatcore.SessionTypeGroup, msg.Name, uid, []int64{uid})
|
||||
if err != nil {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "group_created",
|
||||
SessionId: session.Id,
|
||||
Content: msg.Name,
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) handleCreateDM(conn protocol.Connection, msg *WsMessage) error {
|
||||
uid := h.getUserId(conn)
|
||||
if uid <= 0 {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: "authentication required",
|
||||
})
|
||||
}
|
||||
if msg.TargetId <= 0 {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: "targetId is required",
|
||||
})
|
||||
}
|
||||
|
||||
session, err := h.svcCtx.Store.CreateSession(chatcore.SessionTypeDM, "", uid, []int64{uid, msg.TargetId})
|
||||
if err != nil {
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "error",
|
||||
Content: err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
return conn.SendJSON(context.Background(), WsResponse{
|
||||
Type: "dm_created",
|
||||
SessionId: session.Id,
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user