From 756ca20c6d7f2272bac1e15aceebf9b373e56536 Mon Sep 17 00:00:00 2001 From: wwweww <2646787260@qq.com> Date: Fri, 24 Apr 2026 20:43:53 +0800 Subject: [PATCH] add: chat service --- app/chat/api/chat.go | 37 +++ app/chat/api/etc/chat-api.yaml | 34 +++ app/chat/api/internal/config/config.go | 13 + app/chat/api/internal/handler/chat/handler.go | 143 +++++++++ .../api/internal/handler/chat/messaging.go | 168 +++++++++++ .../api/internal/handler/chat/sessions.go | 62 ++++ app/chat/api/internal/svc/serviceContext.go | 22 ++ app/chat/chatcore/store.go | 252 ++++++++++++++++ app/chat/rpc/chatservice/chatService.go | 114 +++++++ app/chat/rpc/etc/pb.yaml | 5 + app/chat/rpc/internal/config/config.go | 7 + .../internal/logic/addChatMessagesLogic.go | 69 +++++ .../internal/logic/addChatSessionsLogic.go | 64 ++++ .../rpc/internal/logic/addParticipantLogic.go | 46 +++ .../internal/logic/delChatMessagesLogic.go | 45 +++ .../internal/logic/delChatSessionsLogic.go | 35 +++ .../logic/getChatMessagesByIdLogic.go | 38 +++ .../logic/getChatSessionsByIdLogic.go | 38 +++ app/chat/rpc/internal/logic/helpers.go | 10 + .../internal/logic/removeParticipantLogic.go | 47 +++ .../internal/logic/searchChatMessagesLogic.go | 58 ++++ .../internal/logic/searchChatSessionsLogic.go | 65 ++++ .../internal/logic/updateChatSessionsLogic.go | 49 +++ .../rpc/internal/server/chatServiceServer.go | 75 +++++ app/chat/rpc/internal/svc/serviceContext.go | 15 + app/chat/rpc/internal/svc/store.go | 38 +++ app/chat/rpc/pb.go | 39 +++ app/chat/rpc/pb/chat_grpc.go | 175 +++++++++++ app/chat/rpc/pb/chat_grpc_handlers.go | 161 ++++++++++ app/chat/rpc/pb/chat_messages.go | 179 +++++++++++ app/chat/rpc/pb/chat_sessions.go | 282 ++++++++++++++++++ app/chat/test/Dockerfile.api | 18 ++ app/chat/test/Dockerfile.rpc | 18 ++ app/chat/test/certs/tls.crt | 27 ++ app/chat/test/certs/tls.key | 28 ++ app/chat/test/chat-api-local.yaml | 29 ++ app/chat/test/chat-api-test.yaml | 37 +++ app/chat/test/docker-compose.yml | 17 ++ app/chat/test/pb-local.yaml | 5 + app/chat/test/run_tests.sh | 62 ++++ app/chat/test/test_ws.py | 125 ++++++++ app/chat/test/test_wt.py | 129 ++++++++ desc/rpc/chat.proto | 155 ++++++++++ 43 files changed, 3035 insertions(+) create mode 100644 app/chat/api/chat.go create mode 100644 app/chat/api/etc/chat-api.yaml create mode 100644 app/chat/api/internal/config/config.go create mode 100644 app/chat/api/internal/handler/chat/handler.go create mode 100644 app/chat/api/internal/handler/chat/messaging.go create mode 100644 app/chat/api/internal/handler/chat/sessions.go create mode 100644 app/chat/api/internal/svc/serviceContext.go create mode 100644 app/chat/chatcore/store.go create mode 100644 app/chat/rpc/chatservice/chatService.go create mode 100644 app/chat/rpc/etc/pb.yaml create mode 100644 app/chat/rpc/internal/config/config.go create mode 100644 app/chat/rpc/internal/logic/addChatMessagesLogic.go create mode 100644 app/chat/rpc/internal/logic/addChatSessionsLogic.go create mode 100644 app/chat/rpc/internal/logic/addParticipantLogic.go create mode 100644 app/chat/rpc/internal/logic/delChatMessagesLogic.go create mode 100644 app/chat/rpc/internal/logic/delChatSessionsLogic.go create mode 100644 app/chat/rpc/internal/logic/getChatMessagesByIdLogic.go create mode 100644 app/chat/rpc/internal/logic/getChatSessionsByIdLogic.go create mode 100644 app/chat/rpc/internal/logic/helpers.go create mode 100644 app/chat/rpc/internal/logic/removeParticipantLogic.go create mode 100644 app/chat/rpc/internal/logic/searchChatMessagesLogic.go create mode 100644 app/chat/rpc/internal/logic/searchChatSessionsLogic.go create mode 100644 app/chat/rpc/internal/logic/updateChatSessionsLogic.go create mode 100644 app/chat/rpc/internal/server/chatServiceServer.go create mode 100644 app/chat/rpc/internal/svc/serviceContext.go create mode 100644 app/chat/rpc/internal/svc/store.go create mode 100644 app/chat/rpc/pb.go create mode 100644 app/chat/rpc/pb/chat_grpc.go create mode 100644 app/chat/rpc/pb/chat_grpc_handlers.go create mode 100644 app/chat/rpc/pb/chat_messages.go create mode 100644 app/chat/rpc/pb/chat_sessions.go create mode 100644 app/chat/test/Dockerfile.api create mode 100644 app/chat/test/Dockerfile.rpc create mode 100644 app/chat/test/certs/tls.crt create mode 100644 app/chat/test/certs/tls.key create mode 100644 app/chat/test/chat-api-local.yaml create mode 100644 app/chat/test/chat-api-test.yaml create mode 100644 app/chat/test/docker-compose.yml create mode 100644 app/chat/test/pb-local.yaml create mode 100755 app/chat/test/run_tests.sh create mode 100644 app/chat/test/test_ws.py create mode 100644 app/chat/test/test_wt.py create mode 100644 desc/rpc/chat.proto diff --git a/app/chat/api/chat.go b/app/chat/api/chat.go new file mode 100644 index 0000000..7c4de6c --- /dev/null +++ b/app/chat/api/chat.go @@ -0,0 +1,37 @@ +package main + +import ( + "flag" + "fmt" + + chathandler "juwan-backend/app/chat/api/internal/handler/chat" + + "juwan-backend/app/chat/api/internal/config" + "juwan-backend/app/chat/api/internal/svc" + + "github.com/wwweww/go-wst/hybrid" + "github.com/zeromicro/go-zero/core/conf" + "github.com/zeromicro/go-zero/core/service" +) + +var configFile = flag.String("f", "etc/chat-api.yaml", "the config file") + +func main() { + flag.Parse() + + var c config.Config + conf.MustLoad(*configFile, &c) + svcCtx := svc.NewServiceContext(c) + + handler := chathandler.NewHandler(svcCtx) + hybridServer := hybrid.MustNewServer(c.Hybrid, handler) + handler.SetServer(hybridServer) + + group := service.NewServiceGroup() + defer group.Stop() + + group.Add(hybridServer) + + fmt.Printf("Starting chat hybrid server (ws=%s%s)...\n", c.Hybrid.Ws.Addr, c.Hybrid.Ws.Path) + group.Start() +} diff --git a/app/chat/api/etc/chat-api.yaml b/app/chat/api/etc/chat-api.yaml new file mode 100644 index 0000000..f0b5509 --- /dev/null +++ b/app/chat/api/etc/chat-api.yaml @@ -0,0 +1,34 @@ +Name: chat-api +Host: 0.0.0.0 +Port: 8888 + +Hybrid: + Name: chat-hybrid + Protocol: auto + Ws: + Name: chat-ws + Addr: :8889 + Path: /ws/chat + MaxConnections: 10000 + Auth: + Enabled: true + Source: envoy-header + HeaderName: X-User-ID + Wt: + Addr: :8443 + Path: /wt/chat + CertFile: /etc/certs/tls.crt + KeyFile: /etc/certs/tls.key + FallbackStrategy: auto + MaxRetries: 3 + MaxConnections: 10000 + Auth: + Enabled: true + WsHeaderName: X-User-ID + +Stateless: + PollInterval: 100ms + BatchSize: 100 + +Log: + Level: info diff --git a/app/chat/api/internal/config/config.go b/app/chat/api/internal/config/config.go new file mode 100644 index 0000000..84c9d68 --- /dev/null +++ b/app/chat/api/internal/config/config.go @@ -0,0 +1,13 @@ +package config + +import ( + "github.com/wwweww/go-wst/hybrid" + "github.com/wwweww/go-wst/stateless" + "github.com/zeromicro/go-zero/rest" +) + +type Config struct { + rest.RestConf + Hybrid hybrid.HybridConf + Stateless stateless.Config +} diff --git a/app/chat/api/internal/handler/chat/handler.go b/app/chat/api/internal/handler/chat/handler.go new file mode 100644 index 0000000..5977014 --- /dev/null +++ b/app/chat/api/internal/handler/chat/handler.go @@ -0,0 +1,143 @@ +package chat + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + + "juwan-backend/app/chat/api/internal/svc" + + "github.com/wwweww/go-wst/hybrid" + "github.com/wwweww/go-wst/protocol" + "github.com/zeromicro/go-zero/core/logx" +) + +type WsMessage struct { + Type string `json:"type"` + SessionId int64 `json:"sessionId,omitempty"` + TargetId int64 `json:"targetId,omitempty"` + Content string `json:"content,omitempty"` + Name string `json:"name,omitempty"` + MsgType string `json:"msgType,omitempty"` +} + +type WsResponse struct { + Type string `json:"type"` + SessionId int64 `json:"sessionId,omitempty"` + SenderId int64 `json:"senderId,omitempty"` + Content string `json:"content,omitempty"` + Data interface{} `json:"data,omitempty"` +} + +type Handler struct { + svcCtx *svc.ServiceContext + server *hybrid.Server +} + +var _ protocol.StatefulHandler = (*Handler)(nil) +var _ protocol.StatelessHandler = (*Handler)(nil) + +func NewHandler(svcCtx *svc.ServiceContext) *Handler { + return &Handler{ + svcCtx: svcCtx, + } +} + +func (h *Handler) SetServer(s *hybrid.Server) { + h.server = s +} + +func (h *Handler) OnConnect(conn protocol.Connection) error { + logx.Infof("chat connected: id=%s userID=%s protocol=%s", conn.ID(), conn.UserID(), conn.Protocol()) + if uid := conn.UserID(); uid != "" { + h.server.BindUser(conn, uid) + } + return conn.SendJSON(context.Background(), WsResponse{ + Type: "connected", + Content: "chat service connected", + }) +} + +func (h *Handler) OnMessage(conn protocol.Connection, raw []byte) error { + var msg WsMessage + if err := json.Unmarshal(raw, &msg); err != nil { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: "invalid message format", + }) + } + + switch msg.Type { + case "create_group": + return h.handleCreateGroup(conn, &msg) + case "create_dm": + return h.handleCreateDM(conn, &msg) + case "join": + return h.handleJoin(conn, &msg) + case "leave": + return h.handleLeave(conn, &msg) + case "message": + return h.handleMessage(conn, &msg) + case "history": + return h.handleHistory(conn, &msg) + default: + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: fmt.Sprintf("unknown message type: %s", msg.Type), + }) + } +} + +func (h *Handler) OnDisconnect(conn protocol.Connection, err error) { + logx.Infof("chat disconnected: userID=%s err=%v", conn.UserID(), err) +} + +func (h *Handler) Fetch(ctx context.Context, req protocol.FetchRequest) ([]protocol.Message, error) { + return h.svcCtx.MsgStore.Fetch(ctx, req.UserID, req.SinceID, req.Limit) +} + +func (h *Handler) Send(ctx context.Context, req protocol.SendRequest) error { + msg := protocol.Message{ + Type: "message", + Topic: req.Topic, + Data: req.Data, + } + return h.svcCtx.MsgStore.Store(ctx, req.UserID, msg) +} + +func (h *Handler) getUserId(conn protocol.Connection) int64 { + uid, _ := strconv.ParseInt(conn.UserID(), 10, 64) + return uid +} + +func (h *Handler) broadcastToParticipants(participants []int64, resp WsResponse) { + data, err := json.Marshal(resp) + if err != nil { + logx.Errorf("marshal error: %v", err) + return + } + + userIDs := make([]string, len(participants)) + for i, p := range participants { + userIDs[i] = strconv.FormatInt(p, 10) + } + + if err := h.server.BroadcastTo(userIDs, data); err != nil { + logx.Errorf("broadcastTo failed: %v", err) + } +} + +func (h *Handler) storeOfflineMessage(userID string, resp WsResponse) { + data, err := json.Marshal(resp) + if err != nil { + return + } + msg := protocol.Message{ + Type: "chat", + Data: data, + } + if storeErr := h.svcCtx.MsgStore.Store(context.Background(), userID, msg); storeErr != nil { + logx.Errorf("store offline msg for %s failed: %v", userID, storeErr) + } +} diff --git a/app/chat/api/internal/handler/chat/messaging.go b/app/chat/api/internal/handler/chat/messaging.go new file mode 100644 index 0000000..b218552 --- /dev/null +++ b/app/chat/api/internal/handler/chat/messaging.go @@ -0,0 +1,168 @@ +package chat + +import ( + "context" + "fmt" + "strconv" + + "juwan-backend/app/chat/chatcore" + + "github.com/wwweww/go-wst/protocol" +) + +func (h *Handler) handleJoin(conn protocol.Connection, msg *WsMessage) error { + uid := h.getUserId(conn) + if uid <= 0 { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: "authentication required", + }) + } + if msg.SessionId <= 0 { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: "sessionId is required", + }) + } + + session, err := h.svcCtx.Store.GetSession(msg.SessionId) + if err != nil { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: "session not found", + }) + } + + isMember := false + for _, p := range session.Participants { + if p == uid { + isMember = true + break + } + } + if !isMember { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: "not a member of this session", + }) + } + + conn.SetMetadata("sessionId", msg.SessionId) + + h.broadcastToParticipants(session.Participants, WsResponse{ + Type: "user_joined", + SessionId: msg.SessionId, + SenderId: uid, + Content: fmt.Sprintf("user %d joined", uid), + }) + + return conn.SendJSON(context.Background(), WsResponse{ + Type: "joined", + SessionId: msg.SessionId, + }) +} + +func (h *Handler) handleLeave(conn protocol.Connection, msg *WsMessage) error { + uid := h.getUserId(conn) + sessionId := msg.SessionId + if sessionId <= 0 { + if sid, ok := conn.Metadata()["sessionId"].(int64); ok { + sessionId = sid + } + } + if sessionId <= 0 { + return nil + } + + session, err := h.svcCtx.Store.GetSession(sessionId) + if err == nil { + h.broadcastToParticipants(session.Participants, WsResponse{ + Type: "user_left", + SessionId: sessionId, + SenderId: uid, + Content: fmt.Sprintf("user %d left", uid), + }) + } + + return nil +} + +func (h *Handler) handleMessage(conn protocol.Connection, msg *WsMessage) error { + uid := h.getUserId(conn) + if uid <= 0 { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: "authentication required", + }) + } + + sessionId := msg.SessionId + if sessionId <= 0 { + if sid, ok := conn.Metadata()["sessionId"].(int64); ok { + sessionId = sid + } + } + if sessionId <= 0 { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: "sessionId is required, join a session first", + }) + } + + msgType := chatcore.MessageType(msg.MsgType) + if msgType == "" { + msgType = chatcore.MessageTypeText + } + + chatMsg, err := h.svcCtx.Store.AddMessage(sessionId, uid, msgType, msg.Content) + if err != nil { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: err.Error(), + }) + } + + session, err := h.svcCtx.Store.GetSession(sessionId) + if err != nil { + return nil + } + + outMsg := WsResponse{ + Type: "message", + SessionId: sessionId, + SenderId: uid, + Content: msg.Content, + Data: map[string]int64{"messageId": chatMsg.Id}, + } + + h.broadcastToParticipants(session.Participants, outMsg) + + for _, p := range session.Participants { + if p == uid { + continue + } + userIdStr := strconv.FormatInt(p, 10) + if _, online := h.server.GetConnection(userIdStr); !online { + h.storeOfflineMessage(userIdStr, outMsg) + } + } + + return nil +} + +func (h *Handler) handleHistory(conn protocol.Connection, msg *WsMessage) error { + if msg.SessionId <= 0 { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: "sessionId is required", + }) + } + + messages := h.svcCtx.Store.GetMessages(msg.SessionId, 0, 50) + + return conn.SendJSON(context.Background(), WsResponse{ + Type: "history", + SessionId: msg.SessionId, + Data: messages, + }) +} diff --git a/app/chat/api/internal/handler/chat/sessions.go b/app/chat/api/internal/handler/chat/sessions.go new file mode 100644 index 0000000..162e130 --- /dev/null +++ b/app/chat/api/internal/handler/chat/sessions.go @@ -0,0 +1,62 @@ +package chat + +import ( + "context" + + "juwan-backend/app/chat/chatcore" + + "github.com/wwweww/go-wst/protocol" +) + +func (h *Handler) handleCreateGroup(conn protocol.Connection, msg *WsMessage) error { + uid := h.getUserId(conn) + if uid <= 0 { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: "authentication required", + }) + } + + session, err := h.svcCtx.Store.CreateSession(chatcore.SessionTypeGroup, msg.Name, uid, []int64{uid}) + if err != nil { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: err.Error(), + }) + } + + return conn.SendJSON(context.Background(), WsResponse{ + Type: "group_created", + SessionId: session.Id, + Content: msg.Name, + }) +} + +func (h *Handler) handleCreateDM(conn protocol.Connection, msg *WsMessage) error { + uid := h.getUserId(conn) + if uid <= 0 { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: "authentication required", + }) + } + if msg.TargetId <= 0 { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: "targetId is required", + }) + } + + session, err := h.svcCtx.Store.CreateSession(chatcore.SessionTypeDM, "", uid, []int64{uid, msg.TargetId}) + if err != nil { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: err.Error(), + }) + } + + return conn.SendJSON(context.Background(), WsResponse{ + Type: "dm_created", + SessionId: session.Id, + }) +} diff --git a/app/chat/api/internal/svc/serviceContext.go b/app/chat/api/internal/svc/serviceContext.go new file mode 100644 index 0000000..7fe1906 --- /dev/null +++ b/app/chat/api/internal/svc/serviceContext.go @@ -0,0 +1,22 @@ +package svc + +import ( + "juwan-backend/app/chat/api/internal/config" + "juwan-backend/app/chat/chatcore" + + "github.com/wwweww/go-wst/stateless" +) + +type ServiceContext struct { + Config config.Config + Store *chatcore.Store + MsgStore *stateless.MemoryStore +} + +func NewServiceContext(c config.Config) *ServiceContext { + return &ServiceContext{ + Config: c, + Store: chatcore.NewStore(), + MsgStore: stateless.NewMemoryStore(), + } +} diff --git a/app/chat/chatcore/store.go b/app/chat/chatcore/store.go new file mode 100644 index 0000000..17fa313 --- /dev/null +++ b/app/chat/chatcore/store.go @@ -0,0 +1,252 @@ +package chatcore + +import ( + "errors" + "sync" + "time" +) + +type SessionType string + +const ( + SessionTypeGroup SessionType = "group" + SessionTypeDM SessionType = "dm" +) + +type MessageType string + +const ( + MessageTypeText MessageType = "text" + MessageTypeImage MessageType = "image" + MessageTypeSystem MessageType = "system" +) + +type Session struct { + Id int64 `json:"id"` + Type SessionType `json:"type"` + Name string `json:"name"` + CreatorId int64 `json:"creatorId"` + Participants []int64 `json:"participants"` + LastMessage string `json:"lastMessage"` + LastMessageAt int64 `json:"lastMessageAt"` + CreatedAt int64 `json:"createdAt"` + UpdatedAt int64 `json:"updatedAt"` +} + +type Message struct { + Id int64 `json:"id"` + SessionId int64 `json:"sessionId"` + SenderId int64 `json:"senderId"` + Type MessageType `json:"type"` + Content string `json:"content"` + CreatedAt int64 `json:"createdAt"` +} + +type Store struct { + mu sync.RWMutex + + nextSessionID int64 + nextMessageID int64 + + Sessions map[int64]*Session + Messages map[int64]*Message + SessionMessages map[int64][]int64 // sessionId -> []messageId +} + +func NewStore() *Store { + return &Store{ + nextSessionID: 1000, + nextMessageID: 1000, + Sessions: make(map[int64]*Session), + Messages: make(map[int64]*Message), + SessionMessages: make(map[int64][]int64), + } +} + +func (s *Store) CreateSession(typ SessionType, name string, creatorId int64, participants []int64) (*Session, error) { + if creatorId <= 0 { + return nil, errors.New("creatorId is required") + } + + s.mu.Lock() + defer s.mu.Unlock() + + now := time.Now().Unix() + ps := append([]int64(nil), participants...) + hasCreator := false + for _, p := range ps { + if p == creatorId { + hasCreator = true + break + } + } + if !hasCreator { + ps = append(ps, creatorId) + } + + s.nextSessionID++ + session := &Session{ + Id: s.nextSessionID, + Type: typ, + Name: name, + CreatorId: creatorId, + Participants: ps, + CreatedAt: now, + UpdatedAt: now, + } + s.Sessions[session.Id] = session + return session, nil +} + +func (s *Store) GetSession(id int64) (*Session, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + session, ok := s.Sessions[id] + if !ok { + return nil, errors.New("session not found") + } + return session, nil +} + +func (s *Store) ListUserSessions(userId int64, page, limit int) []*Session { + s.mu.RLock() + defer s.mu.RUnlock() + + var results []*Session + for _, sess := range s.Sessions { + for _, p := range sess.Participants { + if p == userId { + results = append(results, sess) + break + } + } + } + + offset := page * limit + if offset >= len(results) { + return nil + } + end := offset + limit + if end > len(results) { + end = len(results) + } + return results[offset:end] +} + +func (s *Store) AddParticipant(sessionId, userId int64) error { + s.mu.Lock() + defer s.mu.Unlock() + + session, ok := s.Sessions[sessionId] + if !ok { + return errors.New("session not found") + } + for _, p := range session.Participants { + if p == userId { + return nil + } + } + session.Participants = append(session.Participants, userId) + session.UpdatedAt = time.Now().Unix() + return nil +} + +func (s *Store) RemoveParticipant(sessionId, userId int64) error { + s.mu.Lock() + defer s.mu.Unlock() + + session, ok := s.Sessions[sessionId] + if !ok { + return errors.New("session not found") + } + filtered := make([]int64, 0, len(session.Participants)) + for _, p := range session.Participants { + if p != userId { + filtered = append(filtered, p) + } + } + session.Participants = filtered + session.UpdatedAt = time.Now().Unix() + return nil +} + +func (s *Store) AddMessage(sessionId, senderId int64, msgType MessageType, content string) (*Message, error) { + if sessionId <= 0 { + return nil, errors.New("sessionId is required") + } + if senderId <= 0 { + return nil, errors.New("senderId is required") + } + if content == "" { + return nil, errors.New("content is required") + } + + s.mu.Lock() + defer s.mu.Unlock() + + session, ok := s.Sessions[sessionId] + if !ok { + return nil, errors.New("session not found") + } + + now := time.Now().Unix() + if msgType == "" { + msgType = MessageTypeText + } + + s.nextMessageID++ + msg := &Message{ + Id: s.nextMessageID, + SessionId: sessionId, + SenderId: senderId, + Type: msgType, + Content: content, + CreatedAt: now, + } + s.Messages[msg.Id] = msg + s.SessionMessages[sessionId] = append(s.SessionMessages[sessionId], msg.Id) + + session.LastMessage = content + session.LastMessageAt = now + session.UpdatedAt = now + + return msg, nil +} + +func (s *Store) GetMessages(sessionId int64, page, limit int) []*Message { + s.mu.RLock() + defer s.mu.RUnlock() + + msgIDs := s.SessionMessages[sessionId] + var results []*Message + for _, id := range msgIDs { + if msg, ok := s.Messages[id]; ok { + results = append(results, msg) + } + } + + if limit <= 0 { + limit = 50 + } + offset := page * limit + if offset >= len(results) { + return nil + } + end := offset + limit + if end > len(results) { + end = len(results) + } + return results[offset:end] +} + +func (s *Store) DeleteSession(id int64) { + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.Sessions, id) + for _, msgId := range s.SessionMessages[id] { + delete(s.Messages, msgId) + } + delete(s.SessionMessages, id) +} diff --git a/app/chat/rpc/chatservice/chatService.go b/app/chat/rpc/chatservice/chatService.go new file mode 100644 index 0000000..f3cdfc1 --- /dev/null +++ b/app/chat/rpc/chatservice/chatService.go @@ -0,0 +1,114 @@ +package chatservice + +import ( + "context" + + "juwan-backend/app/chat/rpc/pb" + + "github.com/zeromicro/go-zero/zrpc" + "google.golang.org/grpc" +) + +type ( + ChatSessions = pb.ChatSessions + AddChatSessionsReq = pb.AddChatSessionsReq + AddChatSessionsResp = pb.AddChatSessionsResp + UpdateChatSessionsReq = pb.UpdateChatSessionsReq + UpdateChatSessionsResp = pb.UpdateChatSessionsResp + DelChatSessionsReq = pb.DelChatSessionsReq + DelChatSessionsResp = pb.DelChatSessionsResp + GetChatSessionsByIdReq = pb.GetChatSessionsByIdReq + GetChatSessionsByIdResp = pb.GetChatSessionsByIdResp + SearchChatSessionsReq = pb.SearchChatSessionsReq + SearchChatSessionsResp = pb.SearchChatSessionsResp + AddParticipantReq = pb.AddParticipantReq + AddParticipantResp = pb.AddParticipantResp + RemoveParticipantReq = pb.RemoveParticipantReq + RemoveParticipantResp = pb.RemoveParticipantResp + ChatMessages = pb.ChatMessages + AddChatMessagesReq = pb.AddChatMessagesReq + AddChatMessagesResp = pb.AddChatMessagesResp + DelChatMessagesReq = pb.DelChatMessagesReq + DelChatMessagesResp = pb.DelChatMessagesResp + GetChatMessagesByIdReq = pb.GetChatMessagesByIdReq + GetChatMessagesByIdResp = pb.GetChatMessagesByIdResp + SearchChatMessagesReq = pb.SearchChatMessagesReq + SearchChatMessagesResp = pb.SearchChatMessagesResp + + ChatService interface { + AddChatSessions(ctx context.Context, in *AddChatSessionsReq, opts ...grpc.CallOption) (*AddChatSessionsResp, error) + UpdateChatSessions(ctx context.Context, in *UpdateChatSessionsReq, opts ...grpc.CallOption) (*UpdateChatSessionsResp, error) + DelChatSessions(ctx context.Context, in *DelChatSessionsReq, opts ...grpc.CallOption) (*DelChatSessionsResp, error) + GetChatSessionsById(ctx context.Context, in *GetChatSessionsByIdReq, opts ...grpc.CallOption) (*GetChatSessionsByIdResp, error) + SearchChatSessions(ctx context.Context, in *SearchChatSessionsReq, opts ...grpc.CallOption) (*SearchChatSessionsResp, error) + AddParticipant(ctx context.Context, in *AddParticipantReq, opts ...grpc.CallOption) (*AddParticipantResp, error) + RemoveParticipant(ctx context.Context, in *RemoveParticipantReq, opts ...grpc.CallOption) (*RemoveParticipantResp, error) + AddChatMessages(ctx context.Context, in *AddChatMessagesReq, opts ...grpc.CallOption) (*AddChatMessagesResp, error) + DelChatMessages(ctx context.Context, in *DelChatMessagesReq, opts ...grpc.CallOption) (*DelChatMessagesResp, error) + GetChatMessagesById(ctx context.Context, in *GetChatMessagesByIdReq, opts ...grpc.CallOption) (*GetChatMessagesByIdResp, error) + SearchChatMessages(ctx context.Context, in *SearchChatMessagesReq, opts ...grpc.CallOption) (*SearchChatMessagesResp, error) + } + + defaultChatService struct { + cli zrpc.Client + } +) + +func NewChatService(cli zrpc.Client) ChatService { + return &defaultChatService{cli: cli} +} + +func (m *defaultChatService) AddChatSessions(ctx context.Context, in *AddChatSessionsReq, opts ...grpc.CallOption) (*AddChatSessionsResp, error) { + client := pb.NewChatServiceClient(m.cli.Conn()) + return client.AddChatSessions(ctx, in, opts...) +} + +func (m *defaultChatService) UpdateChatSessions(ctx context.Context, in *UpdateChatSessionsReq, opts ...grpc.CallOption) (*UpdateChatSessionsResp, error) { + client := pb.NewChatServiceClient(m.cli.Conn()) + return client.UpdateChatSessions(ctx, in, opts...) +} + +func (m *defaultChatService) DelChatSessions(ctx context.Context, in *DelChatSessionsReq, opts ...grpc.CallOption) (*DelChatSessionsResp, error) { + client := pb.NewChatServiceClient(m.cli.Conn()) + return client.DelChatSessions(ctx, in, opts...) +} + +func (m *defaultChatService) GetChatSessionsById(ctx context.Context, in *GetChatSessionsByIdReq, opts ...grpc.CallOption) (*GetChatSessionsByIdResp, error) { + client := pb.NewChatServiceClient(m.cli.Conn()) + return client.GetChatSessionsById(ctx, in, opts...) +} + +func (m *defaultChatService) SearchChatSessions(ctx context.Context, in *SearchChatSessionsReq, opts ...grpc.CallOption) (*SearchChatSessionsResp, error) { + client := pb.NewChatServiceClient(m.cli.Conn()) + return client.SearchChatSessions(ctx, in, opts...) +} + +func (m *defaultChatService) AddParticipant(ctx context.Context, in *AddParticipantReq, opts ...grpc.CallOption) (*AddParticipantResp, error) { + client := pb.NewChatServiceClient(m.cli.Conn()) + return client.AddParticipant(ctx, in, opts...) +} + +func (m *defaultChatService) RemoveParticipant(ctx context.Context, in *RemoveParticipantReq, opts ...grpc.CallOption) (*RemoveParticipantResp, error) { + client := pb.NewChatServiceClient(m.cli.Conn()) + return client.RemoveParticipant(ctx, in, opts...) +} + +func (m *defaultChatService) AddChatMessages(ctx context.Context, in *AddChatMessagesReq, opts ...grpc.CallOption) (*AddChatMessagesResp, error) { + client := pb.NewChatServiceClient(m.cli.Conn()) + return client.AddChatMessages(ctx, in, opts...) +} + +func (m *defaultChatService) DelChatMessages(ctx context.Context, in *DelChatMessagesReq, opts ...grpc.CallOption) (*DelChatMessagesResp, error) { + client := pb.NewChatServiceClient(m.cli.Conn()) + return client.DelChatMessages(ctx, in, opts...) +} + +func (m *defaultChatService) GetChatMessagesById(ctx context.Context, in *GetChatMessagesByIdReq, opts ...grpc.CallOption) (*GetChatMessagesByIdResp, error) { + client := pb.NewChatServiceClient(m.cli.Conn()) + return client.GetChatMessagesById(ctx, in, opts...) +} + +func (m *defaultChatService) SearchChatMessages(ctx context.Context, in *SearchChatMessagesReq, opts ...grpc.CallOption) (*SearchChatMessagesResp, error) { + client := pb.NewChatServiceClient(m.cli.Conn()) + return client.SearchChatMessages(ctx, in, opts...) +} diff --git a/app/chat/rpc/etc/pb.yaml b/app/chat/rpc/etc/pb.yaml new file mode 100644 index 0000000..ce7098f --- /dev/null +++ b/app/chat/rpc/etc/pb.yaml @@ -0,0 +1,5 @@ +Name: pb.rpc +ListenOn: 0.0.0.0:8080 + +Log: + Level: debug diff --git a/app/chat/rpc/internal/config/config.go b/app/chat/rpc/internal/config/config.go new file mode 100644 index 0000000..c1f85b9 --- /dev/null +++ b/app/chat/rpc/internal/config/config.go @@ -0,0 +1,7 @@ +package config + +import "github.com/zeromicro/go-zero/zrpc" + +type Config struct { + zrpc.RpcServerConf +} diff --git a/app/chat/rpc/internal/logic/addChatMessagesLogic.go b/app/chat/rpc/internal/logic/addChatMessagesLogic.go new file mode 100644 index 0000000..c56adcc --- /dev/null +++ b/app/chat/rpc/internal/logic/addChatMessagesLogic.go @@ -0,0 +1,69 @@ +package logic + +import ( + "context" + "errors" + + "juwan-backend/app/chat/rpc/internal/svc" + "juwan-backend/app/chat/rpc/pb" + + "github.com/zeromicro/go-zero/core/logx" +) + +type AddChatMessagesLogic struct { + ctx context.Context + svcCtx *svc.ServiceContext + logx.Logger +} + +func NewAddChatMessagesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *AddChatMessagesLogic { + return &AddChatMessagesLogic{ + ctx: ctx, + svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + } +} + +func (l *AddChatMessagesLogic) AddChatMessages(in *pb.AddChatMessagesReq) (*pb.AddChatMessagesResp, error) { + if in.GetSessionId() <= 0 { + return nil, errors.New("sessionId is required") + } + if in.GetSenderId() <= 0 { + return nil, errors.New("senderId is required") + } + if in.GetContent() == "" { + return nil, errors.New("content is required") + } + + store := l.svcCtx.Store + store.Mu.Lock() + defer store.Mu.Unlock() + + if _, ok := store.Sessions[in.GetSessionId()]; !ok { + return nil, errors.New("session not found") + } + + now := nowUnix(0) + msgType := in.GetType() + if msgType == "" { + msgType = "text" + } + + msg := &pb.ChatMessages{ + Id: store.NextMessage(), + SessionId: in.GetSessionId(), + SenderId: in.GetSenderId(), + Type: msgType, + Content: in.GetContent(), + CreatedAt: now, + } + store.Messages[msg.Id] = msg + store.SessionMessages[in.GetSessionId()] = append(store.SessionMessages[in.GetSessionId()], msg.Id) + + session := store.Sessions[in.GetSessionId()] + session.LastMessage = in.GetContent() + session.LastMessageAt = now + session.UpdatedAt = now + + return &pb.AddChatMessagesResp{Id: msg.Id}, nil +} diff --git a/app/chat/rpc/internal/logic/addChatSessionsLogic.go b/app/chat/rpc/internal/logic/addChatSessionsLogic.go new file mode 100644 index 0000000..fd777a4 --- /dev/null +++ b/app/chat/rpc/internal/logic/addChatSessionsLogic.go @@ -0,0 +1,64 @@ +package logic + +import ( + "context" + "errors" + + "juwan-backend/app/chat/rpc/internal/svc" + "juwan-backend/app/chat/rpc/pb" + + "github.com/zeromicro/go-zero/core/logx" +) + +type AddChatSessionsLogic struct { + ctx context.Context + svcCtx *svc.ServiceContext + logx.Logger +} + +func NewAddChatSessionsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *AddChatSessionsLogic { + return &AddChatSessionsLogic{ + ctx: ctx, + svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + } +} + +func (l *AddChatSessionsLogic) AddChatSessions(in *pb.AddChatSessionsReq) (*pb.AddChatSessionsResp, error) { + if in.GetType() == "" { + return nil, errors.New("type is required") + } + if in.GetCreatorId() <= 0 { + return nil, errors.New("creatorId is required") + } + + store := l.svcCtx.Store + store.Mu.Lock() + defer store.Mu.Unlock() + + now := nowUnix(0) + participants := append([]int64(nil), in.GetParticipants()...) + hasCreator := false + for _, p := range participants { + if p == in.GetCreatorId() { + hasCreator = true + break + } + } + if !hasCreator { + participants = append(participants, in.GetCreatorId()) + } + + session := &pb.ChatSessions{ + Id: store.NextSession(), + Type: in.GetType(), + Name: in.GetName(), + CreatorId: in.GetCreatorId(), + Participants: participants, + CreatedAt: now, + UpdatedAt: now, + } + store.Sessions[session.Id] = session + + return &pb.AddChatSessionsResp{Id: session.Id}, nil +} diff --git a/app/chat/rpc/internal/logic/addParticipantLogic.go b/app/chat/rpc/internal/logic/addParticipantLogic.go new file mode 100644 index 0000000..95b3b4e --- /dev/null +++ b/app/chat/rpc/internal/logic/addParticipantLogic.go @@ -0,0 +1,46 @@ +package logic + +import ( + "context" + "errors" + + "juwan-backend/app/chat/rpc/internal/svc" + "juwan-backend/app/chat/rpc/pb" + + "github.com/zeromicro/go-zero/core/logx" +) + +type AddParticipantLogic struct { + ctx context.Context + svcCtx *svc.ServiceContext + logx.Logger +} + +func NewAddParticipantLogic(ctx context.Context, svcCtx *svc.ServiceContext) *AddParticipantLogic { + return &AddParticipantLogic{ + ctx: ctx, + svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + } +} + +func (l *AddParticipantLogic) AddParticipant(in *pb.AddParticipantReq) (*pb.AddParticipantResp, error) { + store := l.svcCtx.Store + store.Mu.Lock() + defer store.Mu.Unlock() + + session, ok := store.Sessions[in.GetSessionId()] + if !ok { + return nil, errors.New("session not found") + } + + for _, p := range session.Participants { + if p == in.GetUserId() { + return &pb.AddParticipantResp{}, nil + } + } + session.Participants = append(session.Participants, in.GetUserId()) + session.UpdatedAt = nowUnix(0) + + return &pb.AddParticipantResp{}, nil +} diff --git a/app/chat/rpc/internal/logic/delChatMessagesLogic.go b/app/chat/rpc/internal/logic/delChatMessagesLogic.go new file mode 100644 index 0000000..100246b --- /dev/null +++ b/app/chat/rpc/internal/logic/delChatMessagesLogic.go @@ -0,0 +1,45 @@ +package logic + +import ( + "context" + + "juwan-backend/app/chat/rpc/internal/svc" + "juwan-backend/app/chat/rpc/pb" + + "github.com/zeromicro/go-zero/core/logx" +) + +type DelChatMessagesLogic struct { + ctx context.Context + svcCtx *svc.ServiceContext + logx.Logger +} + +func NewDelChatMessagesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DelChatMessagesLogic { + return &DelChatMessagesLogic{ + ctx: ctx, + svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + } +} + +func (l *DelChatMessagesLogic) DelChatMessages(in *pb.DelChatMessagesReq) (*pb.DelChatMessagesResp, error) { + store := l.svcCtx.Store + store.Mu.Lock() + defer store.Mu.Unlock() + + msg, ok := store.Messages[in.GetId()] + if ok { + ids := store.SessionMessages[msg.SessionId] + filtered := make([]int64, 0, len(ids)) + for _, id := range ids { + if id != in.GetId() { + filtered = append(filtered, id) + } + } + store.SessionMessages[msg.SessionId] = filtered + } + delete(store.Messages, in.GetId()) + + return &pb.DelChatMessagesResp{}, nil +} diff --git a/app/chat/rpc/internal/logic/delChatSessionsLogic.go b/app/chat/rpc/internal/logic/delChatSessionsLogic.go new file mode 100644 index 0000000..aeb29cd --- /dev/null +++ b/app/chat/rpc/internal/logic/delChatSessionsLogic.go @@ -0,0 +1,35 @@ +package logic + +import ( + "context" + + "juwan-backend/app/chat/rpc/internal/svc" + "juwan-backend/app/chat/rpc/pb" + + "github.com/zeromicro/go-zero/core/logx" +) + +type DelChatSessionsLogic struct { + ctx context.Context + svcCtx *svc.ServiceContext + logx.Logger +} + +func NewDelChatSessionsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DelChatSessionsLogic { + return &DelChatSessionsLogic{ + ctx: ctx, + svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + } +} + +func (l *DelChatSessionsLogic) DelChatSessions(in *pb.DelChatSessionsReq) (*pb.DelChatSessionsResp, error) { + store := l.svcCtx.Store + store.Mu.Lock() + defer store.Mu.Unlock() + + delete(store.Sessions, in.GetId()) + delete(store.SessionMessages, in.GetId()) + + return &pb.DelChatSessionsResp{}, nil +} diff --git a/app/chat/rpc/internal/logic/getChatMessagesByIdLogic.go b/app/chat/rpc/internal/logic/getChatMessagesByIdLogic.go new file mode 100644 index 0000000..ac53deb --- /dev/null +++ b/app/chat/rpc/internal/logic/getChatMessagesByIdLogic.go @@ -0,0 +1,38 @@ +package logic + +import ( + "context" + "errors" + + "juwan-backend/app/chat/rpc/internal/svc" + "juwan-backend/app/chat/rpc/pb" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetChatMessagesByIdLogic struct { + ctx context.Context + svcCtx *svc.ServiceContext + logx.Logger +} + +func NewGetChatMessagesByIdLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetChatMessagesByIdLogic { + return &GetChatMessagesByIdLogic{ + ctx: ctx, + svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + } +} + +func (l *GetChatMessagesByIdLogic) GetChatMessagesById(in *pb.GetChatMessagesByIdReq) (*pb.GetChatMessagesByIdResp, error) { + store := l.svcCtx.Store + store.Mu.RLock() + defer store.Mu.RUnlock() + + msg, ok := store.Messages[in.GetId()] + if !ok { + return nil, errors.New("message not found") + } + + return &pb.GetChatMessagesByIdResp{ChatMessages: msg}, nil +} diff --git a/app/chat/rpc/internal/logic/getChatSessionsByIdLogic.go b/app/chat/rpc/internal/logic/getChatSessionsByIdLogic.go new file mode 100644 index 0000000..c25ab2f --- /dev/null +++ b/app/chat/rpc/internal/logic/getChatSessionsByIdLogic.go @@ -0,0 +1,38 @@ +package logic + +import ( + "context" + "errors" + + "juwan-backend/app/chat/rpc/internal/svc" + "juwan-backend/app/chat/rpc/pb" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetChatSessionsByIdLogic struct { + ctx context.Context + svcCtx *svc.ServiceContext + logx.Logger +} + +func NewGetChatSessionsByIdLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetChatSessionsByIdLogic { + return &GetChatSessionsByIdLogic{ + ctx: ctx, + svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + } +} + +func (l *GetChatSessionsByIdLogic) GetChatSessionsById(in *pb.GetChatSessionsByIdReq) (*pb.GetChatSessionsByIdResp, error) { + store := l.svcCtx.Store + store.Mu.RLock() + defer store.Mu.RUnlock() + + session, ok := store.Sessions[in.GetId()] + if !ok { + return nil, errors.New("session not found") + } + + return &pb.GetChatSessionsByIdResp{ChatSessions: session}, nil +} diff --git a/app/chat/rpc/internal/logic/helpers.go b/app/chat/rpc/internal/logic/helpers.go new file mode 100644 index 0000000..887f098 --- /dev/null +++ b/app/chat/rpc/internal/logic/helpers.go @@ -0,0 +1,10 @@ +package logic + +import "time" + +func nowUnix(ts int64) int64 { + if ts > 0 { + return ts + } + return time.Now().Unix() +} diff --git a/app/chat/rpc/internal/logic/removeParticipantLogic.go b/app/chat/rpc/internal/logic/removeParticipantLogic.go new file mode 100644 index 0000000..a7fa9f5 --- /dev/null +++ b/app/chat/rpc/internal/logic/removeParticipantLogic.go @@ -0,0 +1,47 @@ +package logic + +import ( + "context" + "errors" + + "juwan-backend/app/chat/rpc/internal/svc" + "juwan-backend/app/chat/rpc/pb" + + "github.com/zeromicro/go-zero/core/logx" +) + +type RemoveParticipantLogic struct { + ctx context.Context + svcCtx *svc.ServiceContext + logx.Logger +} + +func NewRemoveParticipantLogic(ctx context.Context, svcCtx *svc.ServiceContext) *RemoveParticipantLogic { + return &RemoveParticipantLogic{ + ctx: ctx, + svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + } +} + +func (l *RemoveParticipantLogic) RemoveParticipant(in *pb.RemoveParticipantReq) (*pb.RemoveParticipantResp, error) { + store := l.svcCtx.Store + store.Mu.Lock() + defer store.Mu.Unlock() + + session, ok := store.Sessions[in.GetSessionId()] + if !ok { + return nil, errors.New("session not found") + } + + filtered := make([]int64, 0, len(session.Participants)) + for _, p := range session.Participants { + if p != in.GetUserId() { + filtered = append(filtered, p) + } + } + session.Participants = filtered + session.UpdatedAt = nowUnix(0) + + return &pb.RemoveParticipantResp{}, nil +} diff --git a/app/chat/rpc/internal/logic/searchChatMessagesLogic.go b/app/chat/rpc/internal/logic/searchChatMessagesLogic.go new file mode 100644 index 0000000..013430f --- /dev/null +++ b/app/chat/rpc/internal/logic/searchChatMessagesLogic.go @@ -0,0 +1,58 @@ +package logic + +import ( + "context" + + "juwan-backend/app/chat/rpc/internal/svc" + "juwan-backend/app/chat/rpc/pb" + + "github.com/zeromicro/go-zero/core/logx" +) + +type SearchChatMessagesLogic struct { + ctx context.Context + svcCtx *svc.ServiceContext + logx.Logger +} + +func NewSearchChatMessagesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SearchChatMessagesLogic { + return &SearchChatMessagesLogic{ + ctx: ctx, + svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + } +} + +func (l *SearchChatMessagesLogic) SearchChatMessages(in *pb.SearchChatMessagesReq) (*pb.SearchChatMessagesResp, error) { + store := l.svcCtx.Store + store.Mu.RLock() + defer store.Mu.RUnlock() + + msgIDs := store.SessionMessages[in.GetSessionId()] + var results []*pb.ChatMessages + for _, id := range msgIDs { + msg, ok := store.Messages[id] + if !ok { + continue + } + if in.SenderId != nil && msg.SenderId != *in.SenderId { + continue + } + results = append(results, msg) + } + + limit := in.GetLimit() + if limit <= 0 { + limit = 20 + } + offset := in.GetPage() * limit + if offset >= int64(len(results)) { + return &pb.SearchChatMessagesResp{}, nil + } + end := offset + limit + if end > int64(len(results)) { + end = int64(len(results)) + } + + return &pb.SearchChatMessagesResp{ChatMessages: results[offset:end]}, nil +} diff --git a/app/chat/rpc/internal/logic/searchChatSessionsLogic.go b/app/chat/rpc/internal/logic/searchChatSessionsLogic.go new file mode 100644 index 0000000..73267dd --- /dev/null +++ b/app/chat/rpc/internal/logic/searchChatSessionsLogic.go @@ -0,0 +1,65 @@ +package logic + +import ( + "context" + + "juwan-backend/app/chat/rpc/internal/svc" + "juwan-backend/app/chat/rpc/pb" + + "github.com/zeromicro/go-zero/core/logx" +) + +type SearchChatSessionsLogic struct { + ctx context.Context + svcCtx *svc.ServiceContext + logx.Logger +} + +func NewSearchChatSessionsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SearchChatSessionsLogic { + return &SearchChatSessionsLogic{ + ctx: ctx, + svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + } +} + +func (l *SearchChatSessionsLogic) SearchChatSessions(in *pb.SearchChatSessionsReq) (*pb.SearchChatSessionsResp, error) { + store := l.svcCtx.Store + store.Mu.RLock() + defer store.Mu.RUnlock() + + var results []*pb.ChatSessions + for _, s := range store.Sessions { + if in.Type != nil && s.Type != *in.Type { + continue + } + if in.UserId != nil { + found := false + for _, p := range s.Participants { + if p == *in.UserId { + found = true + break + } + } + if !found { + continue + } + } + results = append(results, s) + } + + limit := in.GetLimit() + if limit <= 0 { + limit = 20 + } + offset := in.GetPage() * limit + if offset >= int64(len(results)) { + return &pb.SearchChatSessionsResp{}, nil + } + end := offset + limit + if end > int64(len(results)) { + end = int64(len(results)) + } + + return &pb.SearchChatSessionsResp{ChatSessions: results[offset:end]}, nil +} diff --git a/app/chat/rpc/internal/logic/updateChatSessionsLogic.go b/app/chat/rpc/internal/logic/updateChatSessionsLogic.go new file mode 100644 index 0000000..31d81de --- /dev/null +++ b/app/chat/rpc/internal/logic/updateChatSessionsLogic.go @@ -0,0 +1,49 @@ +package logic + +import ( + "context" + "errors" + + "juwan-backend/app/chat/rpc/internal/svc" + "juwan-backend/app/chat/rpc/pb" + + "github.com/zeromicro/go-zero/core/logx" +) + +type UpdateChatSessionsLogic struct { + ctx context.Context + svcCtx *svc.ServiceContext + logx.Logger +} + +func NewUpdateChatSessionsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *UpdateChatSessionsLogic { + return &UpdateChatSessionsLogic{ + ctx: ctx, + svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + } +} + +func (l *UpdateChatSessionsLogic) UpdateChatSessions(in *pb.UpdateChatSessionsReq) (*pb.UpdateChatSessionsResp, error) { + store := l.svcCtx.Store + store.Mu.Lock() + defer store.Mu.Unlock() + + session, ok := store.Sessions[in.GetId()] + if !ok { + return nil, errors.New("session not found") + } + + if in.Name != nil { + session.Name = *in.Name + } + if in.LastMessage != nil { + session.LastMessage = *in.LastMessage + } + if in.LastMessageAt != nil { + session.LastMessageAt = *in.LastMessageAt + } + session.UpdatedAt = nowUnix(0) + + return &pb.UpdateChatSessionsResp{}, nil +} diff --git a/app/chat/rpc/internal/server/chatServiceServer.go b/app/chat/rpc/internal/server/chatServiceServer.go new file mode 100644 index 0000000..839a2a4 --- /dev/null +++ b/app/chat/rpc/internal/server/chatServiceServer.go @@ -0,0 +1,75 @@ +package server + +import ( + "context" + + "juwan-backend/app/chat/rpc/internal/logic" + "juwan-backend/app/chat/rpc/internal/svc" + "juwan-backend/app/chat/rpc/pb" +) + +type ChatServiceServer struct { + svcCtx *svc.ServiceContext + pb.UnimplementedChatServiceServer +} + +func NewChatServiceServer(svcCtx *svc.ServiceContext) *ChatServiceServer { + return &ChatServiceServer{ + svcCtx: svcCtx, + } +} + +func (s *ChatServiceServer) AddChatSessions(ctx context.Context, in *pb.AddChatSessionsReq) (*pb.AddChatSessionsResp, error) { + l := logic.NewAddChatSessionsLogic(ctx, s.svcCtx) + return l.AddChatSessions(in) +} + +func (s *ChatServiceServer) UpdateChatSessions(ctx context.Context, in *pb.UpdateChatSessionsReq) (*pb.UpdateChatSessionsResp, error) { + l := logic.NewUpdateChatSessionsLogic(ctx, s.svcCtx) + return l.UpdateChatSessions(in) +} + +func (s *ChatServiceServer) DelChatSessions(ctx context.Context, in *pb.DelChatSessionsReq) (*pb.DelChatSessionsResp, error) { + l := logic.NewDelChatSessionsLogic(ctx, s.svcCtx) + return l.DelChatSessions(in) +} + +func (s *ChatServiceServer) GetChatSessionsById(ctx context.Context, in *pb.GetChatSessionsByIdReq) (*pb.GetChatSessionsByIdResp, error) { + l := logic.NewGetChatSessionsByIdLogic(ctx, s.svcCtx) + return l.GetChatSessionsById(in) +} + +func (s *ChatServiceServer) SearchChatSessions(ctx context.Context, in *pb.SearchChatSessionsReq) (*pb.SearchChatSessionsResp, error) { + l := logic.NewSearchChatSessionsLogic(ctx, s.svcCtx) + return l.SearchChatSessions(in) +} + +func (s *ChatServiceServer) AddParticipant(ctx context.Context, in *pb.AddParticipantReq) (*pb.AddParticipantResp, error) { + l := logic.NewAddParticipantLogic(ctx, s.svcCtx) + return l.AddParticipant(in) +} + +func (s *ChatServiceServer) RemoveParticipant(ctx context.Context, in *pb.RemoveParticipantReq) (*pb.RemoveParticipantResp, error) { + l := logic.NewRemoveParticipantLogic(ctx, s.svcCtx) + return l.RemoveParticipant(in) +} + +func (s *ChatServiceServer) AddChatMessages(ctx context.Context, in *pb.AddChatMessagesReq) (*pb.AddChatMessagesResp, error) { + l := logic.NewAddChatMessagesLogic(ctx, s.svcCtx) + return l.AddChatMessages(in) +} + +func (s *ChatServiceServer) DelChatMessages(ctx context.Context, in *pb.DelChatMessagesReq) (*pb.DelChatMessagesResp, error) { + l := logic.NewDelChatMessagesLogic(ctx, s.svcCtx) + return l.DelChatMessages(in) +} + +func (s *ChatServiceServer) GetChatMessagesById(ctx context.Context, in *pb.GetChatMessagesByIdReq) (*pb.GetChatMessagesByIdResp, error) { + l := logic.NewGetChatMessagesByIdLogic(ctx, s.svcCtx) + return l.GetChatMessagesById(in) +} + +func (s *ChatServiceServer) SearchChatMessages(ctx context.Context, in *pb.SearchChatMessagesReq) (*pb.SearchChatMessagesResp, error) { + l := logic.NewSearchChatMessagesLogic(ctx, s.svcCtx) + return l.SearchChatMessages(in) +} diff --git a/app/chat/rpc/internal/svc/serviceContext.go b/app/chat/rpc/internal/svc/serviceContext.go new file mode 100644 index 0000000..85819fc --- /dev/null +++ b/app/chat/rpc/internal/svc/serviceContext.go @@ -0,0 +1,15 @@ +package svc + +import "juwan-backend/app/chat/rpc/internal/config" + +type ServiceContext struct { + Config config.Config + Store *ChatStore +} + +func NewServiceContext(c config.Config) *ServiceContext { + return &ServiceContext{ + Config: c, + Store: NewChatStore(), + } +} diff --git a/app/chat/rpc/internal/svc/store.go b/app/chat/rpc/internal/svc/store.go new file mode 100644 index 0000000..a5403dc --- /dev/null +++ b/app/chat/rpc/internal/svc/store.go @@ -0,0 +1,38 @@ +package svc + +import ( + "sync" + + "juwan-backend/app/chat/rpc/pb" +) + +type ChatStore struct { + Mu sync.RWMutex + + nextSessionID int64 + nextMessageID int64 + + Sessions map[int64]*pb.ChatSessions + Messages map[int64]*pb.ChatMessages + SessionMessages map[int64][]int64 +} + +func NewChatStore() *ChatStore { + return &ChatStore{ + nextSessionID: 1000, + nextMessageID: 1000, + Sessions: make(map[int64]*pb.ChatSessions), + Messages: make(map[int64]*pb.ChatMessages), + SessionMessages: make(map[int64][]int64), + } +} + +func (s *ChatStore) NextSession() int64 { + s.nextSessionID++ + return s.nextSessionID +} + +func (s *ChatStore) NextMessage() int64 { + s.nextMessageID++ + return s.nextMessageID +} diff --git a/app/chat/rpc/pb.go b/app/chat/rpc/pb.go new file mode 100644 index 0000000..52b80b7 --- /dev/null +++ b/app/chat/rpc/pb.go @@ -0,0 +1,39 @@ +package main + +import ( + "flag" + "fmt" + + "juwan-backend/app/chat/rpc/internal/config" + "juwan-backend/app/chat/rpc/internal/server" + "juwan-backend/app/chat/rpc/internal/svc" + "juwan-backend/app/chat/rpc/pb" + + "github.com/zeromicro/go-zero/core/conf" + "github.com/zeromicro/go-zero/core/service" + "github.com/zeromicro/go-zero/zrpc" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +var configFile = flag.String("f", "etc/pb.yaml", "the config file") + +func main() { + flag.Parse() + + var c config.Config + conf.MustLoad(*configFile, &c) + ctx := svc.NewServiceContext(c) + + s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) { + pb.RegisterChatServiceServer(grpcServer, server.NewChatServiceServer(ctx)) + + if c.Mode == service.DevMode || c.Mode == service.TestMode { + reflection.Register(grpcServer) + } + }) + defer s.Stop() + + fmt.Printf("Starting rpc server at %s...\n", c.ListenOn) + s.Start() +} diff --git a/app/chat/rpc/pb/chat_grpc.go b/app/chat/rpc/pb/chat_grpc.go new file mode 100644 index 0000000..1982f65 --- /dev/null +++ b/app/chat/rpc/pb/chat_grpc.go @@ -0,0 +1,175 @@ +package pb + +import ( + "context" + + "google.golang.org/grpc" +) + +type ChatServiceServer interface { + AddChatSessions(context.Context, *AddChatSessionsReq) (*AddChatSessionsResp, error) + UpdateChatSessions(context.Context, *UpdateChatSessionsReq) (*UpdateChatSessionsResp, error) + DelChatSessions(context.Context, *DelChatSessionsReq) (*DelChatSessionsResp, error) + GetChatSessionsById(context.Context, *GetChatSessionsByIdReq) (*GetChatSessionsByIdResp, error) + SearchChatSessions(context.Context, *SearchChatSessionsReq) (*SearchChatSessionsResp, error) + AddParticipant(context.Context, *AddParticipantReq) (*AddParticipantResp, error) + RemoveParticipant(context.Context, *RemoveParticipantReq) (*RemoveParticipantResp, error) + AddChatMessages(context.Context, *AddChatMessagesReq) (*AddChatMessagesResp, error) + DelChatMessages(context.Context, *DelChatMessagesReq) (*DelChatMessagesResp, error) + GetChatMessagesById(context.Context, *GetChatMessagesByIdReq) (*GetChatMessagesByIdResp, error) + SearchChatMessages(context.Context, *SearchChatMessagesReq) (*SearchChatMessagesResp, error) + mustEmbedUnimplementedChatServiceServer() +} + +type UnimplementedChatServiceServer struct{} + +func (UnimplementedChatServiceServer) AddChatSessions(context.Context, *AddChatSessionsReq) (*AddChatSessionsResp, error) { + return nil, grpc.Errorf(12, "method AddChatSessions not implemented") +} +func (UnimplementedChatServiceServer) UpdateChatSessions(context.Context, *UpdateChatSessionsReq) (*UpdateChatSessionsResp, error) { + return nil, grpc.Errorf(12, "method UpdateChatSessions not implemented") +} +func (UnimplementedChatServiceServer) DelChatSessions(context.Context, *DelChatSessionsReq) (*DelChatSessionsResp, error) { + return nil, grpc.Errorf(12, "method DelChatSessions not implemented") +} +func (UnimplementedChatServiceServer) GetChatSessionsById(context.Context, *GetChatSessionsByIdReq) (*GetChatSessionsByIdResp, error) { + return nil, grpc.Errorf(12, "method GetChatSessionsById not implemented") +} +func (UnimplementedChatServiceServer) SearchChatSessions(context.Context, *SearchChatSessionsReq) (*SearchChatSessionsResp, error) { + return nil, grpc.Errorf(12, "method SearchChatSessions not implemented") +} +func (UnimplementedChatServiceServer) AddParticipant(context.Context, *AddParticipantReq) (*AddParticipantResp, error) { + return nil, grpc.Errorf(12, "method AddParticipant not implemented") +} +func (UnimplementedChatServiceServer) RemoveParticipant(context.Context, *RemoveParticipantReq) (*RemoveParticipantResp, error) { + return nil, grpc.Errorf(12, "method RemoveParticipant not implemented") +} +func (UnimplementedChatServiceServer) AddChatMessages(context.Context, *AddChatMessagesReq) (*AddChatMessagesResp, error) { + return nil, grpc.Errorf(12, "method AddChatMessages not implemented") +} +func (UnimplementedChatServiceServer) DelChatMessages(context.Context, *DelChatMessagesReq) (*DelChatMessagesResp, error) { + return nil, grpc.Errorf(12, "method DelChatMessages not implemented") +} +func (UnimplementedChatServiceServer) GetChatMessagesById(context.Context, *GetChatMessagesByIdReq) (*GetChatMessagesByIdResp, error) { + return nil, grpc.Errorf(12, "method GetChatMessagesById not implemented") +} +func (UnimplementedChatServiceServer) SearchChatMessages(context.Context, *SearchChatMessagesReq) (*SearchChatMessagesResp, error) { + return nil, grpc.Errorf(12, "method SearchChatMessages not implemented") +} +func (UnimplementedChatServiceServer) mustEmbedUnimplementedChatServiceServer() {} + +type UnsafeChatServiceServer interface { + mustEmbedUnimplementedChatServiceServer() +} + +type ChatServiceClient interface { + AddChatSessions(ctx context.Context, in *AddChatSessionsReq, opts ...grpc.CallOption) (*AddChatSessionsResp, error) + UpdateChatSessions(ctx context.Context, in *UpdateChatSessionsReq, opts ...grpc.CallOption) (*UpdateChatSessionsResp, error) + DelChatSessions(ctx context.Context, in *DelChatSessionsReq, opts ...grpc.CallOption) (*DelChatSessionsResp, error) + GetChatSessionsById(ctx context.Context, in *GetChatSessionsByIdReq, opts ...grpc.CallOption) (*GetChatSessionsByIdResp, error) + SearchChatSessions(ctx context.Context, in *SearchChatSessionsReq, opts ...grpc.CallOption) (*SearchChatSessionsResp, error) + AddParticipant(ctx context.Context, in *AddParticipantReq, opts ...grpc.CallOption) (*AddParticipantResp, error) + RemoveParticipant(ctx context.Context, in *RemoveParticipantReq, opts ...grpc.CallOption) (*RemoveParticipantResp, error) + AddChatMessages(ctx context.Context, in *AddChatMessagesReq, opts ...grpc.CallOption) (*AddChatMessagesResp, error) + DelChatMessages(ctx context.Context, in *DelChatMessagesReq, opts ...grpc.CallOption) (*DelChatMessagesResp, error) + GetChatMessagesById(ctx context.Context, in *GetChatMessagesByIdReq, opts ...grpc.CallOption) (*GetChatMessagesByIdResp, error) + SearchChatMessages(ctx context.Context, in *SearchChatMessagesReq, opts ...grpc.CallOption) (*SearchChatMessagesResp, error) +} + +type chatServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewChatServiceClient(cc grpc.ClientConnInterface) ChatServiceClient { + return &chatServiceClient{cc} +} + +func (c *chatServiceClient) AddChatSessions(ctx context.Context, in *AddChatSessionsReq, opts ...grpc.CallOption) (*AddChatSessionsResp, error) { + out := new(AddChatSessionsResp) + err := c.cc.Invoke(ctx, "/pb.chatService/AddChatSessions", in, out, opts...) + return out, err +} + +func (c *chatServiceClient) UpdateChatSessions(ctx context.Context, in *UpdateChatSessionsReq, opts ...grpc.CallOption) (*UpdateChatSessionsResp, error) { + out := new(UpdateChatSessionsResp) + err := c.cc.Invoke(ctx, "/pb.chatService/UpdateChatSessions", in, out, opts...) + return out, err +} + +func (c *chatServiceClient) DelChatSessions(ctx context.Context, in *DelChatSessionsReq, opts ...grpc.CallOption) (*DelChatSessionsResp, error) { + out := new(DelChatSessionsResp) + err := c.cc.Invoke(ctx, "/pb.chatService/DelChatSessions", in, out, opts...) + return out, err +} + +func (c *chatServiceClient) GetChatSessionsById(ctx context.Context, in *GetChatSessionsByIdReq, opts ...grpc.CallOption) (*GetChatSessionsByIdResp, error) { + out := new(GetChatSessionsByIdResp) + err := c.cc.Invoke(ctx, "/pb.chatService/GetChatSessionsById", in, out, opts...) + return out, err +} + +func (c *chatServiceClient) SearchChatSessions(ctx context.Context, in *SearchChatSessionsReq, opts ...grpc.CallOption) (*SearchChatSessionsResp, error) { + out := new(SearchChatSessionsResp) + err := c.cc.Invoke(ctx, "/pb.chatService/SearchChatSessions", in, out, opts...) + return out, err +} + +func (c *chatServiceClient) AddParticipant(ctx context.Context, in *AddParticipantReq, opts ...grpc.CallOption) (*AddParticipantResp, error) { + out := new(AddParticipantResp) + err := c.cc.Invoke(ctx, "/pb.chatService/AddParticipant", in, out, opts...) + return out, err +} + +func (c *chatServiceClient) RemoveParticipant(ctx context.Context, in *RemoveParticipantReq, opts ...grpc.CallOption) (*RemoveParticipantResp, error) { + out := new(RemoveParticipantResp) + err := c.cc.Invoke(ctx, "/pb.chatService/RemoveParticipant", in, out, opts...) + return out, err +} + +func (c *chatServiceClient) AddChatMessages(ctx context.Context, in *AddChatMessagesReq, opts ...grpc.CallOption) (*AddChatMessagesResp, error) { + out := new(AddChatMessagesResp) + err := c.cc.Invoke(ctx, "/pb.chatService/AddChatMessages", in, out, opts...) + return out, err +} + +func (c *chatServiceClient) DelChatMessages(ctx context.Context, in *DelChatMessagesReq, opts ...grpc.CallOption) (*DelChatMessagesResp, error) { + out := new(DelChatMessagesResp) + err := c.cc.Invoke(ctx, "/pb.chatService/DelChatMessages", in, out, opts...) + return out, err +} + +func (c *chatServiceClient) GetChatMessagesById(ctx context.Context, in *GetChatMessagesByIdReq, opts ...grpc.CallOption) (*GetChatMessagesByIdResp, error) { + out := new(GetChatMessagesByIdResp) + err := c.cc.Invoke(ctx, "/pb.chatService/GetChatMessagesById", in, out, opts...) + return out, err +} + +func (c *chatServiceClient) SearchChatMessages(ctx context.Context, in *SearchChatMessagesReq, opts ...grpc.CallOption) (*SearchChatMessagesResp, error) { + out := new(SearchChatMessagesResp) + err := c.cc.Invoke(ctx, "/pb.chatService/SearchChatMessages", in, out, opts...) + return out, err +} + +var ChatService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "pb.chatService", + HandlerType: (*ChatServiceServer)(nil), + Methods: []grpc.MethodDesc{ + {MethodName: "AddChatSessions", Handler: _ChatService_AddChatSessions_Handler}, + {MethodName: "UpdateChatSessions", Handler: _ChatService_UpdateChatSessions_Handler}, + {MethodName: "DelChatSessions", Handler: _ChatService_DelChatSessions_Handler}, + {MethodName: "GetChatSessionsById", Handler: _ChatService_GetChatSessionsById_Handler}, + {MethodName: "SearchChatSessions", Handler: _ChatService_SearchChatSessions_Handler}, + {MethodName: "AddParticipant", Handler: _ChatService_AddParticipant_Handler}, + {MethodName: "RemoveParticipant", Handler: _ChatService_RemoveParticipant_Handler}, + {MethodName: "AddChatMessages", Handler: _ChatService_AddChatMessages_Handler}, + {MethodName: "DelChatMessages", Handler: _ChatService_DelChatMessages_Handler}, + {MethodName: "GetChatMessagesById", Handler: _ChatService_GetChatMessagesById_Handler}, + {MethodName: "SearchChatMessages", Handler: _ChatService_SearchChatMessages_Handler}, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "chat.proto", +} + +func RegisterChatServiceServer(s grpc.ServiceRegistrar, srv ChatServiceServer) { + s.RegisterService(&ChatService_ServiceDesc, srv) +} diff --git a/app/chat/rpc/pb/chat_grpc_handlers.go b/app/chat/rpc/pb/chat_grpc_handlers.go new file mode 100644 index 0000000..4fe73b7 --- /dev/null +++ b/app/chat/rpc/pb/chat_grpc_handlers.go @@ -0,0 +1,161 @@ +package pb + +import ( + "context" + + "google.golang.org/grpc" +) + +func _ChatService_AddChatSessions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AddChatSessionsReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChatServiceServer).AddChatSessions(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: "/pb.chatService/AddChatSessions"} + return interceptor(ctx, in, info, func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChatServiceServer).AddChatSessions(ctx, req.(*AddChatSessionsReq)) + }) +} + +func _ChatService_UpdateChatSessions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(UpdateChatSessionsReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChatServiceServer).UpdateChatSessions(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: "/pb.chatService/UpdateChatSessions"} + return interceptor(ctx, in, info, func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChatServiceServer).UpdateChatSessions(ctx, req.(*UpdateChatSessionsReq)) + }) +} + +func _ChatService_DelChatSessions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DelChatSessionsReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChatServiceServer).DelChatSessions(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: "/pb.chatService/DelChatSessions"} + return interceptor(ctx, in, info, func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChatServiceServer).DelChatSessions(ctx, req.(*DelChatSessionsReq)) + }) +} + +func _ChatService_GetChatSessionsById_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetChatSessionsByIdReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChatServiceServer).GetChatSessionsById(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: "/pb.chatService/GetChatSessionsById"} + return interceptor(ctx, in, info, func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChatServiceServer).GetChatSessionsById(ctx, req.(*GetChatSessionsByIdReq)) + }) +} + +func _ChatService_SearchChatSessions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SearchChatSessionsReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChatServiceServer).SearchChatSessions(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: "/pb.chatService/SearchChatSessions"} + return interceptor(ctx, in, info, func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChatServiceServer).SearchChatSessions(ctx, req.(*SearchChatSessionsReq)) + }) +} + +func _ChatService_AddParticipant_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AddParticipantReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChatServiceServer).AddParticipant(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: "/pb.chatService/AddParticipant"} + return interceptor(ctx, in, info, func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChatServiceServer).AddParticipant(ctx, req.(*AddParticipantReq)) + }) +} + +func _ChatService_RemoveParticipant_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RemoveParticipantReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChatServiceServer).RemoveParticipant(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: "/pb.chatService/RemoveParticipant"} + return interceptor(ctx, in, info, func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChatServiceServer).RemoveParticipant(ctx, req.(*RemoveParticipantReq)) + }) +} + +func _ChatService_AddChatMessages_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AddChatMessagesReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChatServiceServer).AddChatMessages(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: "/pb.chatService/AddChatMessages"} + return interceptor(ctx, in, info, func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChatServiceServer).AddChatMessages(ctx, req.(*AddChatMessagesReq)) + }) +} + +func _ChatService_DelChatMessages_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DelChatMessagesReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChatServiceServer).DelChatMessages(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: "/pb.chatService/DelChatMessages"} + return interceptor(ctx, in, info, func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChatServiceServer).DelChatMessages(ctx, req.(*DelChatMessagesReq)) + }) +} + +func _ChatService_GetChatMessagesById_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetChatMessagesByIdReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChatServiceServer).GetChatMessagesById(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: "/pb.chatService/GetChatMessagesById"} + return interceptor(ctx, in, info, func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChatServiceServer).GetChatMessagesById(ctx, req.(*GetChatMessagesByIdReq)) + }) +} + +func _ChatService_SearchChatMessages_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SearchChatMessagesReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChatServiceServer).SearchChatMessages(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: "/pb.chatService/SearchChatMessages"} + return interceptor(ctx, in, info, func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChatServiceServer).SearchChatMessages(ctx, req.(*SearchChatMessagesReq)) + }) +} diff --git a/app/chat/rpc/pb/chat_messages.go b/app/chat/rpc/pb/chat_messages.go new file mode 100644 index 0000000..0b9eb5a --- /dev/null +++ b/app/chat/rpc/pb/chat_messages.go @@ -0,0 +1,179 @@ +package pb + +type ChatMessages struct { + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + SessionId int64 `protobuf:"varint,2,opt,name=sessionId,proto3" json:"sessionId,omitempty"` + SenderId int64 `protobuf:"varint,3,opt,name=senderId,proto3" json:"senderId,omitempty"` + Type string `protobuf:"bytes,4,opt,name=type,proto3" json:"type,omitempty"` + Content string `protobuf:"bytes,5,opt,name=content,proto3" json:"content,omitempty"` + CreatedAt int64 `protobuf:"varint,6,opt,name=createdAt,proto3" json:"createdAt,omitempty"` +} + +func (x *ChatMessages) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *ChatMessages) GetSessionId() int64 { + if x != nil { + return x.SessionId + } + return 0 +} + +func (x *ChatMessages) GetSenderId() int64 { + if x != nil { + return x.SenderId + } + return 0 +} + +func (x *ChatMessages) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *ChatMessages) GetContent() string { + if x != nil { + return x.Content + } + return "" +} + +func (x *ChatMessages) GetCreatedAt() int64 { + if x != nil { + return x.CreatedAt + } + return 0 +} + +type AddChatMessagesReq struct { + SessionId int64 `protobuf:"varint,1,opt,name=sessionId,proto3" json:"sessionId,omitempty"` + SenderId int64 `protobuf:"varint,2,opt,name=senderId,proto3" json:"senderId,omitempty"` + Type string `protobuf:"bytes,3,opt,name=type,proto3" json:"type,omitempty"` + Content string `protobuf:"bytes,4,opt,name=content,proto3" json:"content,omitempty"` +} + +func (x *AddChatMessagesReq) GetSessionId() int64 { + if x != nil { + return x.SessionId + } + return 0 +} + +func (x *AddChatMessagesReq) GetSenderId() int64 { + if x != nil { + return x.SenderId + } + return 0 +} + +func (x *AddChatMessagesReq) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *AddChatMessagesReq) GetContent() string { + if x != nil { + return x.Content + } + return "" +} + +type AddChatMessagesResp struct { + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *AddChatMessagesResp) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +type DelChatMessagesReq struct { + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *DelChatMessagesReq) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +type DelChatMessagesResp struct{} + +type GetChatMessagesByIdReq struct { + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *GetChatMessagesByIdReq) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +type GetChatMessagesByIdResp struct { + ChatMessages *ChatMessages `protobuf:"bytes,1,opt,name=chatMessages,proto3" json:"chatMessages,omitempty"` +} + +func (x *GetChatMessagesByIdResp) GetChatMessages() *ChatMessages { + if x != nil { + return x.ChatMessages + } + return nil +} + +type SearchChatMessagesReq struct { + Page int64 `protobuf:"varint,1,opt,name=page,proto3" json:"page,omitempty"` + Limit int64 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"` + SessionId int64 `protobuf:"varint,3,opt,name=sessionId,proto3" json:"sessionId,omitempty"` + SenderId *int64 `protobuf:"varint,4,opt,name=senderId,proto3,oneof" json:"senderId,omitempty"` +} + +func (x *SearchChatMessagesReq) GetPage() int64 { + if x != nil { + return x.Page + } + return 0 +} + +func (x *SearchChatMessagesReq) GetLimit() int64 { + if x != nil { + return x.Limit + } + return 0 +} + +func (x *SearchChatMessagesReq) GetSessionId() int64 { + if x != nil { + return x.SessionId + } + return 0 +} + +func (x *SearchChatMessagesReq) GetSenderId() *int64 { + if x != nil { + return x.SenderId + } + return nil +} + +type SearchChatMessagesResp struct { + ChatMessages []*ChatMessages `protobuf:"bytes,1,rep,name=chatMessages,proto3" json:"chatMessages,omitempty"` +} + +func (x *SearchChatMessagesResp) GetChatMessages() []*ChatMessages { + if x != nil { + return x.ChatMessages + } + return nil +} diff --git a/app/chat/rpc/pb/chat_sessions.go b/app/chat/rpc/pb/chat_sessions.go new file mode 100644 index 0000000..b40cfc7 --- /dev/null +++ b/app/chat/rpc/pb/chat_sessions.go @@ -0,0 +1,282 @@ +package pb + +type ChatSessions struct { + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` + Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` + CreatorId int64 `protobuf:"varint,4,opt,name=creatorId,proto3" json:"creatorId,omitempty"` + Participants []int64 `protobuf:"varint,5,rep,packed,name=participants,proto3" json:"participants,omitempty"` + LastMessage string `protobuf:"bytes,6,opt,name=lastMessage,proto3" json:"lastMessage,omitempty"` + LastMessageAt int64 `protobuf:"varint,7,opt,name=lastMessageAt,proto3" json:"lastMessageAt,omitempty"` + CreatedAt int64 `protobuf:"varint,8,opt,name=createdAt,proto3" json:"createdAt,omitempty"` + UpdatedAt int64 `protobuf:"varint,9,opt,name=updatedAt,proto3" json:"updatedAt,omitempty"` +} + +func (x *ChatSessions) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *ChatSessions) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *ChatSessions) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *ChatSessions) GetCreatorId() int64 { + if x != nil { + return x.CreatorId + } + return 0 +} + +func (x *ChatSessions) GetParticipants() []int64 { + if x != nil { + return x.Participants + } + return nil +} + +func (x *ChatSessions) GetLastMessage() string { + if x != nil { + return x.LastMessage + } + return "" +} + +func (x *ChatSessions) GetLastMessageAt() int64 { + if x != nil { + return x.LastMessageAt + } + return 0 +} + +func (x *ChatSessions) GetCreatedAt() int64 { + if x != nil { + return x.CreatedAt + } + return 0 +} + +func (x *ChatSessions) GetUpdatedAt() int64 { + if x != nil { + return x.UpdatedAt + } + return 0 +} + +type AddChatSessionsReq struct { + Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + CreatorId int64 `protobuf:"varint,3,opt,name=creatorId,proto3" json:"creatorId,omitempty"` + Participants []int64 `protobuf:"varint,4,rep,packed,name=participants,proto3" json:"participants,omitempty"` +} + +func (x *AddChatSessionsReq) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *AddChatSessionsReq) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *AddChatSessionsReq) GetCreatorId() int64 { + if x != nil { + return x.CreatorId + } + return 0 +} + +func (x *AddChatSessionsReq) GetParticipants() []int64 { + if x != nil { + return x.Participants + } + return nil +} + +type AddChatSessionsResp struct { + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *AddChatSessionsResp) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +type UpdateChatSessionsReq struct { + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Name *string `protobuf:"bytes,2,opt,name=name,proto3,oneof" json:"name,omitempty"` + LastMessage *string `protobuf:"bytes,3,opt,name=lastMessage,proto3,oneof" json:"lastMessage,omitempty"` + LastMessageAt *int64 `protobuf:"varint,4,opt,name=lastMessageAt,proto3,oneof" json:"lastMessageAt,omitempty"` +} + +func (x *UpdateChatSessionsReq) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *UpdateChatSessionsReq) GetName() *string { + if x != nil { + return x.Name + } + return nil +} + +func (x *UpdateChatSessionsReq) GetLastMessage() *string { + if x != nil { + return x.LastMessage + } + return nil +} + +func (x *UpdateChatSessionsReq) GetLastMessageAt() *int64 { + if x != nil { + return x.LastMessageAt + } + return nil +} + +type UpdateChatSessionsResp struct{} + +type DelChatSessionsReq struct { + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *DelChatSessionsReq) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +type DelChatSessionsResp struct{} + +type GetChatSessionsByIdReq struct { + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *GetChatSessionsByIdReq) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +type GetChatSessionsByIdResp struct { + ChatSessions *ChatSessions `protobuf:"bytes,1,opt,name=chatSessions,proto3" json:"chatSessions,omitempty"` +} + +func (x *GetChatSessionsByIdResp) GetChatSessions() *ChatSessions { + if x != nil { + return x.ChatSessions + } + return nil +} + +type SearchChatSessionsReq struct { + Page int64 `protobuf:"varint,1,opt,name=page,proto3" json:"page,omitempty"` + Limit int64 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"` + UserId *int64 `protobuf:"varint,3,opt,name=userId,proto3,oneof" json:"userId,omitempty"` + Type *string `protobuf:"bytes,4,opt,name=type,proto3,oneof" json:"type,omitempty"` +} + +func (x *SearchChatSessionsReq) GetPage() int64 { + if x != nil { + return x.Page + } + return 0 +} + +func (x *SearchChatSessionsReq) GetLimit() int64 { + if x != nil { + return x.Limit + } + return 0 +} + +func (x *SearchChatSessionsReq) GetUserId() *int64 { + if x != nil { + return x.UserId + } + return nil +} + +func (x *SearchChatSessionsReq) GetType() *string { + if x != nil { + return x.Type + } + return nil +} + +type SearchChatSessionsResp struct { + ChatSessions []*ChatSessions `protobuf:"bytes,1,rep,name=chatSessions,proto3" json:"chatSessions,omitempty"` +} + +func (x *SearchChatSessionsResp) GetChatSessions() []*ChatSessions { + if x != nil { + return x.ChatSessions + } + return nil +} + +type AddParticipantReq struct { + SessionId int64 `protobuf:"varint,1,opt,name=sessionId,proto3" json:"sessionId,omitempty"` + UserId int64 `protobuf:"varint,2,opt,name=userId,proto3" json:"userId,omitempty"` +} + +func (x *AddParticipantReq) GetSessionId() int64 { + if x != nil { + return x.SessionId + } + return 0 +} + +func (x *AddParticipantReq) GetUserId() int64 { + if x != nil { + return x.UserId + } + return 0 +} + +type AddParticipantResp struct{} + +type RemoveParticipantReq struct { + SessionId int64 `protobuf:"varint,1,opt,name=sessionId,proto3" json:"sessionId,omitempty"` + UserId int64 `protobuf:"varint,2,opt,name=userId,proto3" json:"userId,omitempty"` +} + +func (x *RemoveParticipantReq) GetSessionId() int64 { + if x != nil { + return x.SessionId + } + return 0 +} + +func (x *RemoveParticipantReq) GetUserId() int64 { + if x != nil { + return x.UserId + } + return 0 +} + +type RemoveParticipantResp struct{} diff --git a/app/chat/test/Dockerfile.api b/app/chat/test/Dockerfile.api new file mode 100644 index 0000000..600bdc9 --- /dev/null +++ b/app/chat/test/Dockerfile.api @@ -0,0 +1,18 @@ +FROM golang:1.25-alpine AS builder + +RUN apk add --no-cache git + +WORKDIR /build + +COPY go-wst/ go-wst/ +COPY juwan-backend/go.mod juwan-backend/go.sum juwan-backend/ +WORKDIR /build/juwan-backend +RUN go mod download + +COPY juwan-backend/ /build/juwan-backend/ +RUN CGO_ENABLED=0 go build -o /chat-api ./app/chat/api/ + +FROM alpine:latest +COPY --from=builder /chat-api /chat-api +COPY juwan-backend/app/chat/test/chat-api-test.yaml /etc/chat-api.yaml +CMD ["/chat-api", "-f", "/etc/chat-api.yaml"] diff --git a/app/chat/test/Dockerfile.rpc b/app/chat/test/Dockerfile.rpc new file mode 100644 index 0000000..940e931 --- /dev/null +++ b/app/chat/test/Dockerfile.rpc @@ -0,0 +1,18 @@ +FROM golang:1.25-alpine AS builder + +RUN apk add --no-cache git + +WORKDIR /build + +COPY go-wst/ go-wst/ +COPY juwan-backend/go.mod juwan-backend/go.sum juwan-backend/ +WORKDIR /build/juwan-backend +RUN go mod download + +COPY juwan-backend/ /build/juwan-backend/ +RUN CGO_ENABLED=0 go build -o /chat-rpc ./app/chat/rpc/ + +FROM alpine:latest +COPY --from=builder /chat-rpc /chat-rpc +COPY juwan-backend/app/chat/rpc/etc/pb.yaml /etc/pb.yaml +CMD ["/chat-rpc", "-f", "/etc/pb.yaml"] diff --git a/app/chat/test/certs/tls.crt b/app/chat/test/certs/tls.crt new file mode 100644 index 0000000..64286cd --- /dev/null +++ b/app/chat/test/certs/tls.crt @@ -0,0 +1,27 @@ +-----BEGIN CERTIFICATE----- +MIIEmjCCAwKgAwIBAgIQP64kUTHSRYb6YJNRMMkJ2DANBgkqhkiG9w0BAQsFADCB +gzEeMBwGA1UEChMVbWtjZXJ0IGRldmVsb3BtZW50IENBMSwwKgYDVQQLDCNhc2Fk +ekBBc2FkemRlTWFjLW1pbmkubG9jYWwgKEFzYWR6KTEzMDEGA1UEAwwqbWtjZXJ0 +IGFzYWR6QEFzYWR6ZGVNYWMtbWluaS5sb2NhbCAoQXNhZHopMB4XDTI2MDQyNDA3 +NDk0NVoXDTI4MDcyNDA3NDk0NVowVzEnMCUGA1UEChMebWtjZXJ0IGRldmVsb3Bt +ZW50IGNlcnRpZmljYXRlMSwwKgYDVQQLDCNhc2FkekBBc2FkemRlTWFjLW1pbmku +bG9jYWwgKEFzYWR6KTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAK96 +emv0wyPAnxxMVLMzp7iSOlRtq4ay68xTFOCCMSvgDTek91lyA1AL/zZ558C86dio +9HI43VIy70BwQEzVHdPdg1bPWn02ic4197po+k/xbKUdAxSElM2JdRkr1D6OeTz2 +y4jAqL2YZu/ZWR2PZ41TSYEnSc3UKc/ZsdOanF21w5OHpL5cNzJHcQ+8KP4vMEHd +odUwxGbp4D0/Wnd57hSO6M1XywiQRDlJq+atqiPSAG1AlI30T39KNkcfYwv90WgD +t1S8KQhYS5ddP81TUoMymQLczxoQkv4DjG3K4UhnscRNXa3IaVWAkXX1x4eakd3X +jKh9uNCxTtPM+iFKmbUCAwEAAaOBtDCBsTAOBgNVHQ8BAf8EBAMCBaAwEwYDVR0l +BAwwCgYIKwYBBQUHAwEwHwYDVR0jBBgwFoAUYOQEJcqeAhlfeIBUbdPBAFGj9Ogw +aQYDVR0RBGIwYIIJbG9jYWxob3N0gghjaGF0LWFwaYIiY2hhdC1hcGkuZGVmYXVs +dC5zdmMuY2x1c3Rlci5sb2NhbIINKi5qdXdhbi5sb2NhbIcEfwAAAYcQAAAAAAAA +AAAAAAAAAAAAATANBgkqhkiG9w0BAQsFAAOCAYEABQeqvTqcNpT9cnTdz0kwNlHW +f6GGfYQ39ZZ1XTwKbFKKner+0Oe+WkoQnMt0sTx/ImOMpC4LAaq08pU0k85d4lQA +yGSv8mnWLyEVFnU02cfeIcMhV6qrl5Od/g4Ow2JRRlMQxg/FRzNtzIIcPwi46K5V +mozMXIf6QOUGa4wPrh7AdybYnA2YPmJJrNCwI2ycHtapmo3T5oO1dm+KWSWbYrx7 +yiN6ZBTxaxESJfjPYCrSNXnzRuXrseDIlKYyU0j3GMmbaSOYHVWSTnsB/Mei9Tff +uLHOalyawbsgjqT4xVd7MFXni/mk2FDwJcPH8WAg0KgHZ+M7j8oWkqj5RS8skBFc +EK6Y4PbYRjKUWESQPGBbUwjkjSPYz2KiWz4cnXyL2MnAg1BPUskNrBfPUEWIkpWe +XyljMEOofQBw7G9QFIrQwWD3I8ps+KicskgcUoY62AkGh4Ky0X84tJCIrS3bwkCi +OR681vZWZqPpG3sj7zpmcnAibA0Y7Jpj+9RoR1D9 +-----END CERTIFICATE----- diff --git a/app/chat/test/certs/tls.key b/app/chat/test/certs/tls.key new file mode 100644 index 0000000..24374af --- /dev/null +++ b/app/chat/test/certs/tls.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCvenpr9MMjwJ8c +TFSzM6e4kjpUbauGsuvMUxTggjEr4A03pPdZcgNQC/82eefAvOnYqPRyON1SMu9A +cEBM1R3T3YNWz1p9NonONfe6aPpP8WylHQMUhJTNiXUZK9Q+jnk89suIwKi9mGbv +2Vkdj2eNU0mBJ0nN1CnP2bHTmpxdtcOTh6S+XDcyR3EPvCj+LzBB3aHVMMRm6eA9 +P1p3ee4UjujNV8sIkEQ5Savmraoj0gBtQJSN9E9/SjZHH2ML/dFoA7dUvCkIWEuX +XT/NU1KDMpkC3M8aEJL+A4xtyuFIZ7HETV2tyGlVgJF19ceHmpHd14yofbjQsU7T +zPohSpm1AgMBAAECggEAAiAq25343e/WkWN/ObzISJlBoUvz2S5dt93vlimvGWua +1oa+mPoLCnfJl8V2UQMGiqGCRFojLkvMLknEq7pbKsj3gXKU3Ii42z7KQbN2Rh01 +cosPsBX2xqGRCuBTTcsqjf7boC24IiIbH5ZvBng9K5OX61PQAmsGEZdsknxez4kz +nnc20PbQ2HlZZ8oOTgmGFoYil2q40Lfj3VOVwaIidFfy8MAZNA6T3tmh6NFvupBy +GPNTLPDqu4b5MUX5UxX/QV+cusK6h5rcyoLdmFY8jAEstIISmtx5HaJXw5oB0+6w ++r5F+LCNVqmS5DUfvfAHiQ5TnACyy8QKsrSnbC1SAwKBgQDBvYgeEOhI5CvLmRe9 +JCgHO/Pu14Gg63oQN3uHakjKrUOo8fFXSUMpGJZy7PPeAZJ0ssFJLarEcn+9df25 +ksluZbinwJs71rWHp1jZNVHvvHuuPxV+exrwZO7FrIUUjmZCKH8thKcGcNfAT74q +V6WHDeQLzdAnEzibmuZnKHe7OwKBgQDn3pbyxuCgA3aZBOiVvgdVuRoCRyTRV3I8 +LbabdpDvr3UvnZy/uPQFaFcmxaasHAy8ZJulbWrLGcDxK8CZRu8xLQY6I53djd95 +8v/YIvFmWmTnkMz7qibNqKJjKVWN4WbZcd1b+Wu6kMzkkVsIU36bHoYiReQXuJwq +7lvbV+lPzwKBgQCPEXNPIJUoHrbopqkNF4IntXIxUht7xehhyVcDbM1MPh7Ux7W9 +C3D5DBstyyVbMDYCz25Ep+CPKS6Drnora+YsDBoMZwM7cRakkkPeQq27J6j9x8AL +osUF+MMKXpf30iBZgqZH6smcy//HGBwKEKc/0FYzEU1BTcRjxEOYsh2YuQKBgGOw +pvOwoAkMFCSMILeo4RxxHgaWsfSzhTDscpN6savroxWazTb8/SWKC9ZmqldbI/qn +wueoGH9EDllid0cvYU2iTwgWIhyMj+WtnWQ++c0I1lNdRVR6fn5zn4XE0rzSiVa6 +BvMxVKj88qre9+WniEqHICKCLCQqwjIPEz1GGdCvAoGAP8fzIc/1esph4FT94SxR +CWYGKskH2/iv7LeB9xI+uS4/oz1hZ9lLhZYfFzYzyGKQjDLLDAI7mFdS30VHjYBu +/lYZOqvs9awQjCXQ0BftU0P2wU+ANBLEZPKxyquZqItQzRavOV3a5/1iI7//vDeN +OWMzztsAP0sRb2ns95zWpiQ= +-----END PRIVATE KEY----- diff --git a/app/chat/test/chat-api-local.yaml b/app/chat/test/chat-api-local.yaml new file mode 100644 index 0000000..9f72f9c --- /dev/null +++ b/app/chat/test/chat-api-local.yaml @@ -0,0 +1,29 @@ +Name: chat-api +Host: 0.0.0.0 +Port: 28888 + +Hybrid: + Name: chat-hybrid + Protocol: auto + Ws: + Name: chat-ws + Addr: :28889 + Path: /ws/chat + MaxConnections: 10000 + Auth: + Enabled: true + Source: envoy-header + HeaderName: X-User-ID + FallbackStrategy: auto + MaxRetries: 3 + MaxConnections: 10000 + Auth: + Enabled: true + WsHeaderName: X-User-ID + +Stateless: + PollInterval: 100ms + BatchSize: 100 + +Log: + Level: debug diff --git a/app/chat/test/chat-api-test.yaml b/app/chat/test/chat-api-test.yaml new file mode 100644 index 0000000..6c252e7 --- /dev/null +++ b/app/chat/test/chat-api-test.yaml @@ -0,0 +1,37 @@ +Name: chat-api +Host: 0.0.0.0 +Port: 8888 + +Hybrid: + Name: chat-hybrid + Protocol: auto + Ws: + Name: chat-ws + Addr: :8889 + Path: /ws/chat + MaxConnections: 10000 + Auth: + Enabled: true + Source: envoy-header + HeaderName: X-User-ID + Wt: + Addr: :8443 + Path: /wt/chat + CertFile: /etc/certs/tls.crt + KeyFile: /etc/certs/tls.key + FallbackStrategy: auto + MaxRetries: 3 + MaxConnections: 10000 + Auth: + Enabled: true + WsHeaderName: X-User-ID + WtTokenSource: query + WtTokenName: token + WtJWTSecret: test-secret + +Stateless: + PollInterval: 100ms + BatchSize: 100 + +Log: + Level: debug diff --git a/app/chat/test/docker-compose.yml b/app/chat/test/docker-compose.yml new file mode 100644 index 0000000..43825b5 --- /dev/null +++ b/app/chat/test/docker-compose.yml @@ -0,0 +1,17 @@ +services: + chat-api: + build: + context: ../../../../ + dockerfile: juwan-backend/app/chat/test/Dockerfile.api + container_name: chat-api-test + ports: + - "28888:8888" + - "28889:8889" + - "28443:8443/udp" + volumes: + - ./certs:/etc/certs:ro + healthcheck: + test: ["CMD", "true"] + interval: 2s + timeout: 2s + retries: 5 diff --git a/app/chat/test/pb-local.yaml b/app/chat/test/pb-local.yaml new file mode 100644 index 0000000..4603d4c --- /dev/null +++ b/app/chat/test/pb-local.yaml @@ -0,0 +1,5 @@ +Name: pb.rpc +ListenOn: 0.0.0.0:28080 + +Log: + Level: debug diff --git a/app/chat/test/run_tests.sh b/app/chat/test/run_tests.sh new file mode 100755 index 0000000..04bbed1 --- /dev/null +++ b/app/chat/test/run_tests.sh @@ -0,0 +1,62 @@ +#!/bin/bash +set -e + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +LOG_DIR="$SCRIPT_DIR/logs" +mkdir -p "$LOG_DIR" + +echo "=== Chat Service Test Runner ===" +echo "Log directory: $LOG_DIR" +echo "" + +cleanup() { + echo "" + echo "=== Collecting container logs ===" + docker compose -f "$SCRIPT_DIR/docker-compose.yml" logs chat-rpc > "$LOG_DIR/chat-rpc.log" 2>&1 || true + docker compose -f "$SCRIPT_DIR/docker-compose.yml" logs chat-api > "$LOG_DIR/chat-api.log" 2>&1 || true + echo "=== Stopping containers ===" + docker compose -f "$SCRIPT_DIR/docker-compose.yml" down --remove-orphans 2>/dev/null || true +} +trap cleanup EXIT + +echo "=== Step 1: Building Docker images ===" +docker compose -f "$SCRIPT_DIR/docker-compose.yml" build 2>&1 | tee "$LOG_DIR/build.log" +echo "" + +echo "=== Step 2: Starting services ===" +docker compose -f "$SCRIPT_DIR/docker-compose.yml" up -d 2>&1 | tee -a "$LOG_DIR/build.log" +echo "" + +echo "=== Step 3: Waiting for services to be ready ===" +for i in $(seq 1 30); do + if curl -s http://localhost:28888 > /dev/null 2>&1 || [ $i -eq 30 ]; then + break + fi + echo " waiting... ($i/30)" + sleep 2 +done +sleep 3 +echo "Services should be ready." +echo "" + +echo "=== Step 4: Running WebSocket tests ===" +cd "$SCRIPT_DIR" +python3 test_ws.py 2>&1 | tee "$LOG_DIR/ws_test_stdout.log" +WS_RC=${PIPESTATUS[0]} +echo "" + +echo "=== Step 5: Running WebTransport fallback tests ===" +python3 test_wt.py 2>&1 | tee "$LOG_DIR/wt_test_stdout.log" +WT_RC=${PIPESTATUS[0]} +echo "" + +echo "=== Test Summary ===" +echo "WebSocket test: $([ $WS_RC -eq 0 ] && echo 'PASSED' || echo 'FAILED')" +echo "WebTransport test: $([ $WT_RC -eq 0 ] && echo 'PASSED' || echo 'FAILED')" +echo "" +echo "Logs saved to: $LOG_DIR/" +ls -la "$LOG_DIR/" + +if [ $WS_RC -ne 0 ] || [ $WT_RC -ne 0 ]; then + exit 1 +fi diff --git a/app/chat/test/test_ws.py b/app/chat/test/test_ws.py new file mode 100644 index 0000000..d834803 --- /dev/null +++ b/app/chat/test/test_ws.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +"""WebSocket chat test — group chat + DM flows.""" + +import asyncio +import json +import sys +import time + +try: + import websockets +except ImportError: + print("installing websockets...") + import subprocess + subprocess.check_call([sys.executable, "-m", "pip", "install", "websockets", "-q"]) + import websockets + +WS_URL = "ws://localhost:28889/ws/chat" +RESULTS = [] + +def log(tag, msg): + ts = time.strftime("%H:%M:%S") + line = f"[{ts}] [{tag}] {msg}" + print(line) + RESULTS.append(line) + +async def recv_json(ws, timeout=5): + raw = await asyncio.wait_for(ws.recv(), timeout=timeout) + return json.loads(raw) + +async def send_json(ws, data): + await ws.send(json.dumps(data)) + +async def test_ws(): + log("TEST", "=== WebSocket Chat Test Start ===") + + log("WS", "connecting user1...") + user1 = await websockets.connect(WS_URL, additional_headers={"X-User-ID": "1001"}) + resp = await recv_json(user1) + log("WS", f"user1 connected: {resp}") + assert resp["type"] == "connected", f"expected connected, got {resp['type']}" + + log("WS", "connecting user2...") + user2 = await websockets.connect(WS_URL, additional_headers={"X-User-ID": "1002"}) + resp = await recv_json(user2) + log("WS", f"user2 connected: {resp}") + assert resp["type"] == "connected" + + log("TEST", "--- Test 1: Create Group ---") + await send_json(user1, {"type": "create_group", "name": "test-room"}) + resp = await recv_json(user1) + log("WS", f"create_group response: {resp}") + assert resp["type"] == "group_created", f"expected group_created, got {resp['type']}" + group_id = resp["sessionId"] + log("TEST", f"group created with id={group_id}") + + log("TEST", "--- Test 2: Create DM ---") + await send_json(user1, {"type": "create_dm", "targetId": 1002}) + resp = await recv_json(user1) + log("WS", f"create_dm response: {resp}") + assert resp["type"] == "dm_created", f"expected dm_created, got {resp['type']}" + dm_id = resp["sessionId"] + log("TEST", f"DM created with id={dm_id}") + + log("TEST", "--- Test 3: Join Group ---") + await send_json(user1, {"type": "join", "sessionId": group_id}) + msgs = [] + for _ in range(2): + try: + r = await recv_json(user1, timeout=3) + msgs.append(r) + log("WS", f"join msg: {r}") + except asyncio.TimeoutError: + break + types = {m["type"] for m in msgs} + assert "joined" in types, f"expected 'joined' in {types}" + log("TEST", f"join received types: {types}") + + log("TEST", "--- Test 4: Send Message in Group ---") + await send_json(user1, {"type": "message", "sessionId": group_id, "content": "hello group!"}) + resp = await recv_json(user1) + log("WS", f"message broadcast: {resp}") + assert resp["type"] == "message", f"expected message, got {resp['type']}" + assert resp["content"] == "hello group!" + + log("TEST", "--- Test 5: Send DM ---") + await send_json(user1, {"type": "message", "sessionId": dm_id, "content": "hello DM!"}) + resp = await recv_json(user1) + log("WS", f"DM message: {resp}") + assert resp["type"] == "message" + assert resp["content"] == "hello DM!" + + log("TEST", "--- Test 6: Message History ---") + await send_json(user1, {"type": "history", "sessionId": group_id}) + resp = await recv_json(user1) + log("WS", f"history response: type={resp['type']} data_len={len(resp.get('data', []))}") + assert resp["type"] == "history" + + log("TEST", "--- Test 7: Invalid Message ---") + await send_json(user1, {"type": "unknown_action"}) + resp = await recv_json(user1) + log("WS", f"error response: {resp}") + assert resp["type"] == "error" + + log("TEST", "--- Test 8: Leave Group ---") + await send_json(user1, {"type": "leave", "sessionId": group_id}) + + await user1.close() + await user2.close() + log("TEST", "=== WebSocket Chat Test PASSED ===") + +async def main(): + try: + await test_ws() + return 0 + except Exception as e: + log("FAIL", f"Test failed: {e}") + import traceback + log("FAIL", traceback.format_exc()) + return 1 + +if __name__ == "__main__": + rc = asyncio.run(main()) + with open("logs/ws_test.log", "w") as f: + f.write("\n".join(RESULTS) + "\n") + sys.exit(rc) diff --git a/app/chat/test/test_wt.py b/app/chat/test/test_wt.py new file mode 100644 index 0000000..3dd3fbd --- /dev/null +++ b/app/chat/test/test_wt.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +"""WebTransport fallback test — verifies hybrid mode falls back to WS when WT is unavailable.""" + +import asyncio +import json +import sys +import time +import urllib.request +import urllib.error + +try: + import websockets +except ImportError: + import subprocess + subprocess.check_call([sys.executable, "-m", "pip", "install", "websockets", "-q"]) + import websockets + +WS_URL = "ws://localhost:28889/ws/chat" +API_BASE = "http://localhost:28888" +RESULTS = [] + +def log(tag, msg): + ts = time.strftime("%H:%M:%S") + line = f"[{ts}] [{tag}] {msg}" + print(line) + RESULTS.append(line) + +async def recv_json(ws, timeout=5): + raw = await asyncio.wait_for(ws.recv(), timeout=timeout) + return json.loads(raw) + +async def send_json(ws, data): + await ws.send(json.dumps(data)) + +async def test_wt_fallback(): + log("TEST", "=== WebTransport Fallback Test Start ===") + + log("WT", "--- Test 1: WT not configured, WS fallback should work ---") + log("WT", "connecting via WS (fallback path)...") + ws = await websockets.connect(WS_URL, additional_headers={"X-User-ID": "2001"}) + resp = await recv_json(ws) + log("WT", f"fallback WS connected: {resp}") + assert resp["type"] == "connected", f"expected connected, got {resp['type']}" + + log("WT", "--- Test 2: Full chat flow over fallback WS ---") + await send_json(ws, {"type": "create_group", "name": "wt-fallback-room"}) + resp = await recv_json(ws) + log("WT", f"create_group via fallback: {resp}") + assert resp["type"] == "group_created" + group_id = resp["sessionId"] + + await send_json(ws, {"type": "join", "sessionId": group_id}) + resp1 = await recv_json(ws) + log("WT", f"join broadcast: {resp1}") + resp2 = await recv_json(ws) + log("WT", f"join confirm: {resp2}") + + await send_json(ws, {"type": "message", "sessionId": group_id, "content": "hello from WT fallback!"}) + resp = await recv_json(ws) + log("WT", f"message via fallback: {resp}") + assert resp["type"] == "message" + assert resp["content"] == "hello from WT fallback!" + + log("WT", "--- Test 3: DM over fallback ---") + await send_json(ws, {"type": "create_dm", "targetId": 2002}) + resp = await recv_json(ws) + log("WT", f"DM created via fallback: {resp}") + assert resp["type"] == "dm_created" + dm_id = resp["sessionId"] + + await send_json(ws, {"type": "message", "sessionId": dm_id, "content": "DM via fallback"}) + resp = await recv_json(ws) + log("WT", f"DM message via fallback: {resp}") + assert resp["type"] == "message" + + log("WT", "--- Test 4: History over fallback ---") + await send_json(ws, {"type": "history", "sessionId": group_id}) + resp = await recv_json(ws) + log("WT", f"history via fallback: type={resp['type']}") + assert resp["type"] == "history" + + log("WT", "--- Test 5: Multi-user over fallback ---") + ws2 = await websockets.connect(WS_URL, additional_headers={"X-User-ID": "2002"}) + resp = await recv_json(ws2) + assert resp["type"] == "connected" + log("WT", "user2 connected via fallback WS") + + await send_json(ws, {"type": "message", "sessionId": dm_id, "content": "cross-user DM"}) + resp = await recv_json(ws) + log("WT", f"sender got broadcast: {resp}") + assert resp["type"] == "message" + + try: + resp2 = await recv_json(ws2, timeout=2) + log("WT", f"user2 got message: {resp2}") + except asyncio.TimeoutError: + log("WT", "user2 did not receive (not joined to session, expected)") + + await ws.close() + await ws2.close() + + log("WT", "--- Test 6: Verify WT port is not serving (no TLS configured) ---") + try: + wt_ws = await asyncio.wait_for( + websockets.connect("ws://localhost:28443/wt/chat"), + timeout=2 + ) + await wt_ws.close() + log("WT", "WT port unexpectedly open (might be OK if hybrid exposes it)") + except (ConnectionRefusedError, asyncio.TimeoutError, OSError): + log("WT", "WT port not available (expected — no TLS cert configured)") + + log("TEST", "=== WebTransport Fallback Test PASSED ===") + +async def main(): + try: + await test_wt_fallback() + return 0 + except Exception as e: + log("FAIL", f"Test failed: {e}") + import traceback + log("FAIL", traceback.format_exc()) + return 1 + +if __name__ == "__main__": + rc = asyncio.run(main()) + with open("logs/wt_test.log", "w") as f: + f.write("\n".join(RESULTS) + "\n") + sys.exit(rc) diff --git a/desc/rpc/chat.proto b/desc/rpc/chat.proto new file mode 100644 index 0000000..ce88542 --- /dev/null +++ b/desc/rpc/chat.proto @@ -0,0 +1,155 @@ +syntax = "proto3"; + +option go_package ="./pb"; + +package pb; + +// ------------------------------------ +// Messages +// ------------------------------------ + +//--------------------------------chatSessions-------------------------------- +message ChatSessions { + int64 id = 1; //id + string type = 2; //type: group, dm + string name = 3; //name + int64 creatorId = 4; //creatorId + repeated int64 participants = 5; //participants + string lastMessage = 6; //lastMessage + int64 lastMessageAt = 7; //lastMessageAt + int64 createdAt = 8; //createdAt + int64 updatedAt = 9; //updatedAt +} + +message AddChatSessionsReq { + string type = 1; //type: group, dm + string name = 2; //name + int64 creatorId = 3; //creatorId + repeated int64 participants = 4; //participants +} + +message AddChatSessionsResp { + int64 id = 1; //id +} + +message UpdateChatSessionsReq { + int64 id = 1; //id + optional string name = 2; //name + optional string lastMessage = 3; //lastMessage + optional int64 lastMessageAt = 4; //lastMessageAt +} + +message UpdateChatSessionsResp { +} + +message DelChatSessionsReq { + int64 id = 1; //id +} + +message DelChatSessionsResp { +} + +message GetChatSessionsByIdReq { + int64 id = 1; //id +} + +message GetChatSessionsByIdResp { + ChatSessions chatSessions = 1; //chatSessions +} + +message SearchChatSessionsReq { + int64 page = 1; //page + int64 limit = 2; //limit + optional int64 userId = 3; //userId (filter sessions containing this user) + optional string type = 4; //type +} + +message SearchChatSessionsResp { + repeated ChatSessions chatSessions = 1; //chatSessions +} + +message AddParticipantReq { + int64 sessionId = 1; //sessionId + int64 userId = 2; //userId +} + +message AddParticipantResp { +} + +message RemoveParticipantReq { + int64 sessionId = 1; //sessionId + int64 userId = 2; //userId +} + +message RemoveParticipantResp { +} + +//--------------------------------chatMessages-------------------------------- +message ChatMessages { + int64 id = 1; //id + int64 sessionId = 2; //sessionId + int64 senderId = 3; //senderId + string type = 4; //type: text, image, system + string content = 5; //content + int64 createdAt = 6; //createdAt +} + +message AddChatMessagesReq { + int64 sessionId = 1; //sessionId + int64 senderId = 2; //senderId + string type = 3; //type + string content = 4; //content +} + +message AddChatMessagesResp { + int64 id = 1; //id +} + +message DelChatMessagesReq { + int64 id = 1; //id +} + +message DelChatMessagesResp { +} + +message GetChatMessagesByIdReq { + int64 id = 1; //id +} + +message GetChatMessagesByIdResp { + ChatMessages chatMessages = 1; //chatMessages +} + +message SearchChatMessagesReq { + int64 page = 1; //page + int64 limit = 2; //limit + int64 sessionId = 3; //sessionId + optional int64 senderId = 4; //senderId +} + +message SearchChatMessagesResp { + repeated ChatMessages chatMessages = 1; //chatMessages +} + + +// ------------------------------------ +// Rpc Func +// ------------------------------------ + +service chatService{ + + //-----------------------chatSessions----------------------- + rpc AddChatSessions(AddChatSessionsReq) returns (AddChatSessionsResp); + rpc UpdateChatSessions(UpdateChatSessionsReq) returns (UpdateChatSessionsResp); + rpc DelChatSessions(DelChatSessionsReq) returns (DelChatSessionsResp); + rpc GetChatSessionsById(GetChatSessionsByIdReq) returns (GetChatSessionsByIdResp); + rpc SearchChatSessions(SearchChatSessionsReq) returns (SearchChatSessionsResp); + rpc AddParticipant(AddParticipantReq) returns (AddParticipantResp); + rpc RemoveParticipant(RemoveParticipantReq) returns (RemoveParticipantResp); + //-----------------------chatMessages----------------------- + rpc AddChatMessages(AddChatMessagesReq) returns (AddChatMessagesResp); + rpc DelChatMessages(DelChatMessagesReq) returns (DelChatMessagesResp); + rpc GetChatMessagesById(GetChatMessagesByIdReq) returns (GetChatMessagesByIdResp); + rpc SearchChatMessages(SearchChatMessagesReq) returns (SearchChatMessagesResp); + +}