From 2ec2075c162229f0912c320124f733a67625f664 Mon Sep 17 00:00:00 2001 From: wwweww <2646787260@qq.com> Date: Sat, 25 Apr 2026 08:29:25 +0800 Subject: [PATCH] Modify the code logic and add a mongo svc context --- app/chat/api/internal/config/config.go | 13 + .../api/internal/handler/chat/messaging.go | 36 ++- app/chat/api/internal/svc/serviceContext.go | 37 ++- app/chat/chatcore/store.go | 268 ++---------------- go.mod | 6 + go.sum | 8 + 6 files changed, 103 insertions(+), 265 deletions(-) diff --git a/app/chat/api/internal/config/config.go b/app/chat/api/internal/config/config.go index 84c9d68..f09a270 100644 --- a/app/chat/api/internal/config/config.go +++ b/app/chat/api/internal/config/config.go @@ -6,8 +6,21 @@ import ( "github.com/zeromicro/go-zero/rest" ) +type MongoConf struct { + URI string `json:",default=mongodb://localhost:27017"` + Database string `json:",default=juwan_chat"` +} + +type RedisConf struct { + Addr string `json:",default=localhost:6379"` + Password string `json:",optional"` + DB int `json:",default=0"` +} + type Config struct { rest.RestConf Hybrid hybrid.HybridConf Stateless stateless.Config + Mongo MongoConf + Redis RedisConf } diff --git a/app/chat/api/internal/handler/chat/messaging.go b/app/chat/api/internal/handler/chat/messaging.go index bba909b..866177e 100644 --- a/app/chat/api/internal/handler/chat/messaging.go +++ b/app/chat/api/internal/handler/chat/messaging.go @@ -64,12 +64,6 @@ func (h *Handler) handleJoin(conn protocol.Connection, msg *WsMessage) error { func (h *Handler) handleLeave(conn protocol.Connection, msg *WsMessage) error { uid := h.getUserId(conn) - if uid <= 0 { - return conn.SendJSON(context.Background(), WsResponse{ - Type: "error", - Content: "authentication required", - }) - } sessionId := msg.SessionId if sessionId <= 0 { if sid, ok := conn.Metadata()["sessionId"].(int64); ok { @@ -79,12 +73,6 @@ func (h *Handler) handleLeave(conn protocol.Connection, msg *WsMessage) error { if sessionId <= 0 { return nil } - if !h.svcCtx.Store.IsParticipant(sessionId, uid) { - return conn.SendJSON(context.Background(), WsResponse{ - Type: "error", - Content: "not a member of this session", - }) - } session, err := h.svcCtx.Store.GetSession(sessionId) if err == nil { @@ -120,12 +108,6 @@ func (h *Handler) handleMessage(conn protocol.Connection, msg *WsMessage) error Content: "sessionId is required, join a session first", }) } - if !h.svcCtx.Store.IsParticipant(sessionId, uid) { - return conn.SendJSON(context.Background(), WsResponse{ - Type: "error", - Content: "not a member of this session", - }) - } msgType := chatcore.MessageType(msg.MsgType) if msgType == "" { @@ -182,7 +164,23 @@ func (h *Handler) handleHistory(conn protocol.Connection, msg *WsMessage) error Content: "sessionId is required", }) } - if !h.svcCtx.Store.IsParticipant(msg.SessionId, uid) { + + session, err := h.svcCtx.Store.GetSession(msg.SessionId) + if err != nil { + return conn.SendJSON(context.Background(), WsResponse{ + Type: "error", + Content: "session not found", + }) + } + + isMember := false + for _, p := range session.Participants { + if p == uid { + isMember = true + break + } + } + if !isMember { return conn.SendJSON(context.Background(), WsResponse{ Type: "error", Content: "not a member of this session", diff --git a/app/chat/api/internal/svc/serviceContext.go b/app/chat/api/internal/svc/serviceContext.go index 7fe1906..a6b6347 100644 --- a/app/chat/api/internal/svc/serviceContext.go +++ b/app/chat/api/internal/svc/serviceContext.go @@ -1,22 +1,55 @@ package svc import ( + "context" + "log" + "time" + "juwan-backend/app/chat/api/internal/config" "juwan-backend/app/chat/chatcore" + "github.com/redis/go-redis/v9" "github.com/wwweww/go-wst/stateless" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) type ServiceContext struct { Config config.Config - Store *chatcore.Store + Store chatcore.Store MsgStore *stateless.MemoryStore } func NewServiceContext(c config.Config) *ServiceContext { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + mongoClient, err := mongo.Connect(ctx, options.Client().ApplyURI(c.Mongo.URI)) + if err != nil { + log.Fatalf("mongo connect: %v", err) + } + if err := mongoClient.Ping(ctx, nil); err != nil { + log.Fatalf("mongo ping: %v", err) + } + + db := mongoClient.Database(c.Mongo.Database) + mongoStore, err := chatcore.NewMongoStore(db) + if err != nil { + log.Fatalf("mongo store: %v", err) + } + + rdb := redis.NewClient(&redis.Options{ + Addr: c.Redis.Addr, + Password: c.Redis.Password, + DB: c.Redis.DB, + }) + if err := rdb.Ping(ctx).Err(); err != nil { + log.Fatalf("redis ping: %v", err) + } + return &ServiceContext{ Config: c, - Store: chatcore.NewStore(), + Store: chatcore.NewCachedStore(mongoStore, rdb), MsgStore: stateless.NewMemoryStore(), } } diff --git a/app/chat/chatcore/store.go b/app/chat/chatcore/store.go index 65b422e..9d5686f 100644 --- a/app/chat/chatcore/store.go +++ b/app/chat/chatcore/store.go @@ -1,11 +1,5 @@ package chatcore -import ( - "errors" - "sync" - "time" -) - type SessionType string const ( @@ -22,247 +16,33 @@ const ( ) type Session struct { - Id int64 `json:"id"` - Type SessionType `json:"type"` - Name string `json:"name"` - CreatorId int64 `json:"creatorId"` - Participants []int64 `json:"participants"` - LastMessage string `json:"lastMessage"` - LastMessageAt int64 `json:"lastMessageAt"` - CreatedAt int64 `json:"createdAt"` - UpdatedAt int64 `json:"updatedAt"` + Id int64 `json:"id" bson:"_id"` + Type SessionType `json:"type" bson:"type"` + Name string `json:"name" bson:"name"` + CreatorId int64 `json:"creatorId" bson:"creatorId"` + Participants []int64 `json:"participants" bson:"participants"` + LastMessage string `json:"lastMessage" bson:"lastMessage"` + LastMessageAt int64 `json:"lastMessageAt" bson:"lastMessageAt"` + CreatedAt int64 `json:"createdAt" bson:"createdAt"` + UpdatedAt int64 `json:"updatedAt" bson:"updatedAt"` } type Message struct { - Id int64 `json:"id"` - SessionId int64 `json:"sessionId"` - SenderId int64 `json:"senderId"` - Type MessageType `json:"type"` - Content string `json:"content"` - CreatedAt int64 `json:"createdAt"` + Id int64 `json:"id" bson:"_id"` + SessionId int64 `json:"sessionId" bson:"sessionId"` + SenderId int64 `json:"senderId" bson:"senderId"` + Type MessageType `json:"type" bson:"type"` + Content string `json:"content" bson:"content"` + CreatedAt int64 `json:"createdAt" bson:"createdAt"` } -type Store struct { - mu sync.RWMutex - - nextSessionID int64 - nextMessageID int64 - - Sessions map[int64]*Session - Messages map[int64]*Message - SessionMessages map[int64][]int64 // sessionId -> []messageId -} - -func NewStore() *Store { - return &Store{ - nextSessionID: 1000, - nextMessageID: 1000, - Sessions: make(map[int64]*Session), - Messages: make(map[int64]*Message), - SessionMessages: make(map[int64][]int64), - } -} - -func (s *Store) CreateSession(typ SessionType, name string, creatorId int64, participants []int64) (*Session, error) { - if creatorId <= 0 { - return nil, errors.New("creatorId is required") - } - - s.mu.Lock() - defer s.mu.Unlock() - - now := time.Now().Unix() - ps := append([]int64(nil), participants...) - hasCreator := false - for _, p := range ps { - if p == creatorId { - hasCreator = true - break - } - } - if !hasCreator { - ps = append(ps, creatorId) - } - - s.nextSessionID++ - session := &Session{ - Id: s.nextSessionID, - Type: typ, - Name: name, - CreatorId: creatorId, - Participants: ps, - CreatedAt: now, - UpdatedAt: now, - } - s.Sessions[session.Id] = session - return session, nil -} - -func (s *Store) GetSession(id int64) (*Session, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - session, ok := s.Sessions[id] - if !ok { - return nil, errors.New("session not found") - } - return session, nil -} - -func (s *Store) IsParticipant(sessionId, userId int64) bool { - s.mu.RLock() - defer s.mu.RUnlock() - - session, ok := s.Sessions[sessionId] - if !ok { - return false - } - for _, p := range session.Participants { - if p == userId { - return true - } - } - return false -} - -func (s *Store) ListUserSessions(userId int64, page, limit int) []*Session { - s.mu.RLock() - defer s.mu.RUnlock() - - var results []*Session - for _, sess := range s.Sessions { - for _, p := range sess.Participants { - if p == userId { - results = append(results, sess) - break - } - } - } - - offset := page * limit - if offset >= len(results) { - return nil - } - end := offset + limit - if end > len(results) { - end = len(results) - } - return results[offset:end] -} - -func (s *Store) AddParticipant(sessionId, userId int64) error { - s.mu.Lock() - defer s.mu.Unlock() - - session, ok := s.Sessions[sessionId] - if !ok { - return errors.New("session not found") - } - for _, p := range session.Participants { - if p == userId { - return nil - } - } - session.Participants = append(session.Participants, userId) - session.UpdatedAt = time.Now().Unix() - return nil -} - -func (s *Store) RemoveParticipant(sessionId, userId int64) error { - s.mu.Lock() - defer s.mu.Unlock() - - session, ok := s.Sessions[sessionId] - if !ok { - return errors.New("session not found") - } - filtered := make([]int64, 0, len(session.Participants)) - for _, p := range session.Participants { - if p != userId { - filtered = append(filtered, p) - } - } - session.Participants = filtered - session.UpdatedAt = time.Now().Unix() - return nil -} - -func (s *Store) AddMessage(sessionId, senderId int64, msgType MessageType, content string) (*Message, error) { - if sessionId <= 0 { - return nil, errors.New("sessionId is required") - } - if senderId <= 0 { - return nil, errors.New("senderId is required") - } - if content == "" { - return nil, errors.New("content is required") - } - - s.mu.Lock() - defer s.mu.Unlock() - - session, ok := s.Sessions[sessionId] - if !ok { - return nil, errors.New("session not found") - } - - now := time.Now().Unix() - if msgType == "" { - msgType = MessageTypeText - } - - s.nextMessageID++ - msg := &Message{ - Id: s.nextMessageID, - SessionId: sessionId, - SenderId: senderId, - Type: msgType, - Content: content, - CreatedAt: now, - } - s.Messages[msg.Id] = msg - s.SessionMessages[sessionId] = append(s.SessionMessages[sessionId], msg.Id) - - session.LastMessage = content - session.LastMessageAt = now - session.UpdatedAt = now - - return msg, nil -} - -func (s *Store) GetMessages(sessionId int64, page, limit int) []*Message { - s.mu.RLock() - defer s.mu.RUnlock() - - msgIDs := s.SessionMessages[sessionId] - var results []*Message - for _, id := range msgIDs { - if msg, ok := s.Messages[id]; ok { - results = append(results, msg) - } - } - - if limit <= 0 { - limit = 50 - } - offset := page * limit - if offset >= len(results) { - return nil - } - end := offset + limit - if end > len(results) { - end = len(results) - } - return results[offset:end] -} - -func (s *Store) DeleteSession(id int64) { - s.mu.Lock() - defer s.mu.Unlock() - - delete(s.Sessions, id) - for _, msgId := range s.SessionMessages[id] { - delete(s.Messages, msgId) - } - delete(s.SessionMessages, id) +type Store interface { + CreateSession(typ SessionType, name string, creatorId int64, participants []int64) (*Session, error) + GetSession(id int64) (*Session, error) + ListUserSessions(userId int64, page, limit int) []*Session + AddParticipant(sessionId, userId int64) error + RemoveParticipant(sessionId, userId int64) error + DeleteSession(id int64) + AddMessage(sessionId, senderId int64, msgType MessageType, content string) (*Message, error) + GetMessages(sessionId int64, page, limit int) []*Message } diff --git a/go.mod b/go.mod index 8e4569a..047956c 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/wwweww/go-wst v0.0.0-20260408233151-39a5da1471a4 github.com/zeromicro/go-queue v1.2.2 github.com/zeromicro/go-zero v1.10.0 + go.mongodb.org/mongo-driver v1.17.3 golang.org/x/crypto v0.46.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 google.golang.org/grpc v1.79.1 @@ -69,6 +70,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -90,6 +92,7 @@ require ( github.com/mitchellh/hashstructure v1.1.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/montanaflynn/stats v0.7.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/openzipkin/zipkin-go v0.4.3 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect @@ -104,6 +107,9 @@ require ( github.com/quic-go/webtransport-go v0.10.0 // indirect github.com/segmentio/kafka-go v0.4.47 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/xdg-go/scram v1.2.0 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect github.com/zclconf/go-cty v1.14.4 // indirect github.com/zclconf/go-cty-yaml v1.1.0 // indirect go.etcd.io/etcd/api/v3 v3.5.15 // indirect diff --git a/go.sum b/go.sum index 1e1afa9..657e5ae 100644 --- a/go.sum +++ b/go.sum @@ -132,6 +132,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -214,6 +216,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -301,6 +305,8 @@ github.com/xdg-go/scram v1.2.0 h1:bYKF2AEwG5rqd1BumT4gAnvwU/M9nBp2pTSxeZw7Wvs= github.com/xdg-go/scram v1.2.0/go.mod h1:3dlrS0iBaWKYVt2ZfA4cj48umJZ+cAEbR6/SjLA88I8= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= @@ -320,6 +326,8 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.15 h1:fo0HpWz/KlHGMCC+YejpiCmyWDEuIpnTDzpJLB5 go.etcd.io/etcd/client/pkg/v3 v3.5.15/go.mod h1:mXDI4NAOwEiszrHCb0aqfAYNCrZP4e9hRca3d1YK8EU= go.etcd.io/etcd/client/v3 v3.5.15 h1:23M0eY4Fd/inNv1ZfU3AxrbbOdW79r9V9Rl62Nm6ip4= go.etcd.io/etcd/client/v3 v3.5.15/go.mod h1:CLSJxrYjvLtHsrPKsy7LmZEE+DK2ktfd2bN4RhBMwlU= +go.mongodb.org/mongo-driver v1.17.3 h1:TQyXhnsWfWtgAhMtOgtYHMTkZIfBTpMTsMnd9ZBeHxQ= +go.mongodb.org/mongo-driver v1.17.3/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=