From 267ad4bba6db41b826fadf5da26a995d14d85057 Mon Sep 17 00:00:00 2001 From: wwweww <2646787260@qq.com> Date: Sat, 25 Apr 2026 08:31:54 +0800 Subject: [PATCH] Database and caching logic --- app/chat/chatcore/cache.go | 128 +++++++++++++++++++++ app/chat/chatcore/mongo.go | 228 +++++++++++++++++++++++++++++++++++++ 2 files changed, 356 insertions(+) create mode 100644 app/chat/chatcore/cache.go create mode 100644 app/chat/chatcore/mongo.go diff --git a/app/chat/chatcore/cache.go b/app/chat/chatcore/cache.go new file mode 100644 index 0000000..85254d6 --- /dev/null +++ b/app/chat/chatcore/cache.go @@ -0,0 +1,128 @@ +package chatcore + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/redis/go-redis/v9" +) + +const ( + msgCachePrefix = "chat:msgs:" + msgCacheLimit = 100 + msgCacheTTL = 30 * time.Minute +) + +type CachedStore struct { + inner Store + rdb redis.UniversalClient +} + +func NewCachedStore(inner Store, rdb redis.UniversalClient) *CachedStore { + return &CachedStore{inner: inner, rdb: rdb} +} + +func msgKey(sessionId int64) string { + return fmt.Sprintf("%s%d", msgCachePrefix, sessionId) +} + +func (c *CachedStore) CreateSession(typ SessionType, name string, creatorId int64, participants []int64) (*Session, error) { + return c.inner.CreateSession(typ, name, creatorId, participants) +} + +func (c *CachedStore) GetSession(id int64) (*Session, error) { + return c.inner.GetSession(id) +} + +func (c *CachedStore) ListUserSessions(userId int64, page, limit int) []*Session { + return c.inner.ListUserSessions(userId, page, limit) +} + +func (c *CachedStore) AddParticipant(sessionId, userId int64) error { + return c.inner.AddParticipant(sessionId, userId) +} + +func (c *CachedStore) RemoveParticipant(sessionId, userId int64) error { + return c.inner.RemoveParticipant(sessionId, userId) +} + +func (c *CachedStore) DeleteSession(id int64) { + c.inner.DeleteSession(id) + c.rdb.Del(context.Background(), msgKey(id)) +} + +func (c *CachedStore) AddMessage(sessionId, senderId int64, msgType MessageType, content string) (*Message, error) { + msg, err := c.inner.AddMessage(sessionId, senderId, msgType, content) + if err != nil { + return nil, err + } + + data, _ := json.Marshal(msg) + ctx := context.Background() + key := msgKey(sessionId) + + pipe := c.rdb.Pipeline() + pipe.RPush(ctx, key, data) + pipe.LTrim(ctx, key, -msgCacheLimit, -1) + pipe.Expire(ctx, key, msgCacheTTL) + pipe.Exec(ctx) + + return msg, nil +} + +func (c *CachedStore) GetMessages(sessionId int64, page, limit int) []*Message { + if page == 0 && limit <= msgCacheLimit { + if msgs := c.getFromCache(sessionId, limit); msgs != nil { + return msgs + } + } + + msgs := c.inner.GetMessages(sessionId, page, limit) + + if page == 0 && len(msgs) > 0 { + c.warmCache(sessionId, msgs) + } + + return msgs +} + +func (c *CachedStore) getFromCache(sessionId int64, limit int) []*Message { + ctx := context.Background() + key := msgKey(sessionId) + + exists, err := c.rdb.Exists(ctx, key).Result() + if err != nil || exists == 0 { + return nil + } + + vals, err := c.rdb.LRange(ctx, key, 0, int64(limit)-1).Result() + if err != nil || len(vals) == 0 { + return nil + } + + msgs := make([]*Message, 0, len(vals)) + for _, v := range vals { + var msg Message + if json.Unmarshal([]byte(v), &msg) == nil { + msgs = append(msgs, &msg) + } + } + return msgs +} + +func (c *CachedStore) warmCache(sessionId int64, msgs []*Message) { + ctx := context.Background() + key := msgKey(sessionId) + + pipe := c.rdb.Pipeline() + pipe.Del(ctx, key) + for _, msg := range msgs { + data, _ := json.Marshal(msg) + pipe.RPush(ctx, key, data) + } + pipe.LTrim(ctx, key, -msgCacheLimit, -1) + pipe.Expire(ctx, key, msgCacheTTL) + pipe.Exec(ctx) +} diff --git a/app/chat/chatcore/mongo.go b/app/chat/chatcore/mongo.go new file mode 100644 index 0000000..6fc0ba9 --- /dev/null +++ b/app/chat/chatcore/mongo.go @@ -0,0 +1,228 @@ +package chatcore + +import ( + "context" + "errors" + "sync/atomic" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +type MongoStore struct { + sessions *mongo.Collection + messages *mongo.Collection + seqSess atomic.Int64 + seqMsg atomic.Int64 +} + +func NewMongoStore(db *mongo.Database) (*MongoStore, error) { + s := &MongoStore{ + sessions: db.Collection("chat_sessions"), + messages: db.Collection("chat_messages"), + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + s.messages.Indexes().CreateOne(ctx, mongo.IndexModel{ + Keys: bson.D{{Key: "sessionId", Value: 1}, {Key: "createdAt", Value: 1}}, + }) + s.sessions.Indexes().CreateOne(ctx, mongo.IndexModel{ + Keys: bson.D{{Key: "participants", Value: 1}}, + }) + + s.seqSess.Store(time.Now().UnixMilli()) + s.seqMsg.Store(time.Now().UnixMilli()) + + return s, nil +} + +func (s *MongoStore) nextSessionID() int64 { return s.seqSess.Add(1) } +func (s *MongoStore) nextMessageID() int64 { return s.seqMsg.Add(1) } + +func (s *MongoStore) CreateSession(typ SessionType, name string, creatorId int64, participants []int64) (*Session, error) { + if creatorId <= 0 { + return nil, errors.New("creatorId is required") + } + + ps := append([]int64(nil), participants...) + hasCreator := false + for _, p := range ps { + if p == creatorId { + hasCreator = true + break + } + } + if !hasCreator { + ps = append(ps, creatorId) + } + + now := time.Now().Unix() + sess := &Session{ + Id: s.nextSessionID(), + Type: typ, + Name: name, + CreatorId: creatorId, + Participants: ps, + CreatedAt: now, + UpdatedAt: now, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if _, err := s.sessions.InsertOne(ctx, sess); err != nil { + return nil, err + } + return sess, nil +} + +func (s *MongoStore) GetSession(id int64) (*Session, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var sess Session + err := s.sessions.FindOne(ctx, bson.M{"_id": id}).Decode(&sess) + if err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + return nil, errors.New("session not found") + } + return nil, err + } + return &sess, nil +} + +func (s *MongoStore) ListUserSessions(userId int64, page, limit int) []*Session { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if limit <= 0 { + limit = 20 + } + opts := options.Find(). + SetSkip(int64(page * limit)). + SetLimit(int64(limit)). + SetSort(bson.D{{Key: "updatedAt", Value: -1}}) + + cursor, err := s.sessions.Find(ctx, bson.M{"participants": userId}, opts) + if err != nil { + return nil + } + defer cursor.Close(ctx) + + var results []*Session + cursor.All(ctx, &results) + return results +} + +func (s *MongoStore) AddParticipant(sessionId, userId int64) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err := s.sessions.UpdateOne(ctx, + bson.M{"_id": sessionId}, + bson.M{ + "$addToSet": bson.M{"participants": userId}, + "$set": bson.M{"updatedAt": time.Now().Unix()}, + }, + ) + return err +} + +func (s *MongoStore) RemoveParticipant(sessionId, userId int64) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err := s.sessions.UpdateOne(ctx, + bson.M{"_id": sessionId}, + bson.M{ + "$pull": bson.M{"participants": userId}, + "$set": bson.M{"updatedAt": time.Now().Unix()}, + }, + ) + return err +} + +func (s *MongoStore) DeleteSession(id int64) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + s.sessions.DeleteOne(ctx, bson.M{"_id": id}) + s.messages.DeleteMany(ctx, bson.M{"sessionId": id}) +} + +func (s *MongoStore) AddMessage(sessionId, senderId int64, msgType MessageType, content string) (*Message, error) { + if sessionId <= 0 { + return nil, errors.New("sessionId is required") + } + if senderId <= 0 { + return nil, errors.New("senderId is required") + } + if content == "" { + return nil, errors.New("content is required") + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var sess Session + err := s.sessions.FindOne(ctx, bson.M{"_id": sessionId}).Decode(&sess) + if err != nil { + return nil, errors.New("session not found") + } + + if msgType == "" { + msgType = MessageTypeText + } + now := time.Now().Unix() + + msg := &Message{ + Id: s.nextMessageID(), + SessionId: sessionId, + SenderId: senderId, + Type: msgType, + Content: content, + CreatedAt: now, + } + + if _, err := s.messages.InsertOne(ctx, msg); err != nil { + return nil, err + } + + s.sessions.UpdateOne(ctx, + bson.M{"_id": sessionId}, + bson.M{"$set": bson.M{ + "lastMessage": content, + "lastMessageAt": now, + "updatedAt": now, + }}, + ) + + return msg, nil +} + +func (s *MongoStore) GetMessages(sessionId int64, page, limit int) []*Message { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if limit <= 0 { + limit = 50 + } + opts := options.Find(). + SetSkip(int64(page * limit)). + SetLimit(int64(limit)). + SetSort(bson.D{{Key: "createdAt", Value: 1}}) + + cursor, err := s.messages.Find(ctx, bson.M{"sessionId": sessionId}, opts) + if err != nil { + return nil + } + defer cursor.Close(ctx) + + var results []*Message + cursor.All(ctx, &results) + return results +}