package chatcore import ( "context" "errors" "sync/atomic" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type MongoStore struct { sessions *mongo.Collection messages *mongo.Collection seqSess atomic.Int64 seqMsg atomic.Int64 } func NewMongoStore(db *mongo.Database) (*MongoStore, error) { s := &MongoStore{ sessions: db.Collection("chat_sessions"), messages: db.Collection("chat_messages"), } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() s.messages.Indexes().CreateOne(ctx, mongo.IndexModel{ Keys: bson.D{{Key: "sessionId", Value: 1}, {Key: "createdAt", Value: 1}}, }) s.sessions.Indexes().CreateOne(ctx, mongo.IndexModel{ Keys: bson.D{{Key: "participants", Value: 1}}, }) s.seqSess.Store(time.Now().UnixMilli()) s.seqMsg.Store(time.Now().UnixMilli()) return s, nil } func (s *MongoStore) nextSessionID() int64 { return s.seqSess.Add(1) } func (s *MongoStore) nextMessageID() int64 { return s.seqMsg.Add(1) } func (s *MongoStore) CreateSession(typ SessionType, name string, creatorId int64, participants []int64) (*Session, error) { if creatorId <= 0 { return nil, errors.New("creatorId is required") } ps := append([]int64(nil), participants...) hasCreator := false for _, p := range ps { if p == creatorId { hasCreator = true break } } if !hasCreator { ps = append(ps, creatorId) } now := time.Now().Unix() sess := &Session{ Id: s.nextSessionID(), Type: typ, Name: name, CreatorId: creatorId, Participants: ps, CreatedAt: now, UpdatedAt: now, } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if _, err := s.sessions.InsertOne(ctx, sess); err != nil { return nil, err } return sess, nil } func (s *MongoStore) GetSession(id int64) (*Session, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() var sess Session err := s.sessions.FindOne(ctx, bson.M{"_id": id}).Decode(&sess) if err != nil { if errors.Is(err, mongo.ErrNoDocuments) { return nil, errors.New("session not found") } return nil, err } return &sess, nil } func (s *MongoStore) ListUserSessions(userId int64, page, limit int) []*Session { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if limit <= 0 { limit = 20 } opts := options.Find(). SetSkip(int64(page * limit)). SetLimit(int64(limit)). SetSort(bson.D{{Key: "updatedAt", Value: -1}}) cursor, err := s.sessions.Find(ctx, bson.M{"participants": userId}, opts) if err != nil { return nil } defer cursor.Close(ctx) var results []*Session cursor.All(ctx, &results) return results } func (s *MongoStore) AddParticipant(sessionId, userId int64) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _, err := s.sessions.UpdateOne(ctx, bson.M{"_id": sessionId}, bson.M{ "$addToSet": bson.M{"participants": userId}, "$set": bson.M{"updatedAt": time.Now().Unix()}, }, ) return err } func (s *MongoStore) RemoveParticipant(sessionId, userId int64) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _, err := s.sessions.UpdateOne(ctx, bson.M{"_id": sessionId}, bson.M{ "$pull": bson.M{"participants": userId}, "$set": bson.M{"updatedAt": time.Now().Unix()}, }, ) return err } func (s *MongoStore) DeleteSession(id int64) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() s.sessions.DeleteOne(ctx, bson.M{"_id": id}) s.messages.DeleteMany(ctx, bson.M{"sessionId": id}) } func (s *MongoStore) AddMessage(sessionId, senderId int64, msgType MessageType, content string) (*Message, error) { if sessionId <= 0 { return nil, errors.New("sessionId is required") } if senderId <= 0 { return nil, errors.New("senderId is required") } if content == "" { return nil, errors.New("content is required") } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() var sess Session err := s.sessions.FindOne(ctx, bson.M{"_id": sessionId}).Decode(&sess) if err != nil { return nil, errors.New("session not found") } if msgType == "" { msgType = MessageTypeText } now := time.Now().Unix() msg := &Message{ Id: s.nextMessageID(), SessionId: sessionId, SenderId: senderId, Type: msgType, Content: content, CreatedAt: now, } if _, err := s.messages.InsertOne(ctx, msg); err != nil { return nil, err } s.sessions.UpdateOne(ctx, bson.M{"_id": sessionId}, bson.M{"$set": bson.M{ "lastMessage": content, "lastMessageAt": now, "updatedAt": now, }}, ) return msg, nil } func (s *MongoStore) GetMessages(sessionId int64, page, limit int) []*Message { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if limit <= 0 { limit = 50 } opts := options.Find(). SetSkip(int64(page * limit)). SetLimit(int64(limit)). SetSort(bson.D{{Key: "createdAt", Value: 1}}) cursor, err := s.messages.Find(ctx, bson.M{"sessionId": sessionId}, opts) if err != nil { return nil } defer cursor.Close(ctx) var results []*Message cursor.All(ctx, &results) return results }