129 lines
2.9 KiB
Go
129 lines
2.9 KiB
Go
package chatcore
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
const (
|
|
msgCachePrefix = "chat:msgs:"
|
|
msgCacheLimit = 100
|
|
msgCacheTTL = 30 * time.Minute
|
|
)
|
|
|
|
type CachedStore struct {
|
|
inner Store
|
|
rdb redis.UniversalClient
|
|
}
|
|
|
|
func NewCachedStore(inner Store, rdb redis.UniversalClient) *CachedStore {
|
|
return &CachedStore{inner: inner, rdb: rdb}
|
|
}
|
|
|
|
func msgKey(sessionId int64) string {
|
|
return fmt.Sprintf("%s%d", msgCachePrefix, sessionId)
|
|
}
|
|
|
|
func (c *CachedStore) CreateSession(typ SessionType, name string, creatorId int64, participants []int64) (*Session, error) {
|
|
return c.inner.CreateSession(typ, name, creatorId, participants)
|
|
}
|
|
|
|
func (c *CachedStore) GetSession(id int64) (*Session, error) {
|
|
return c.inner.GetSession(id)
|
|
}
|
|
|
|
func (c *CachedStore) ListUserSessions(userId int64, page, limit int) []*Session {
|
|
return c.inner.ListUserSessions(userId, page, limit)
|
|
}
|
|
|
|
func (c *CachedStore) AddParticipant(sessionId, userId int64) error {
|
|
return c.inner.AddParticipant(sessionId, userId)
|
|
}
|
|
|
|
func (c *CachedStore) RemoveParticipant(sessionId, userId int64) error {
|
|
return c.inner.RemoveParticipant(sessionId, userId)
|
|
}
|
|
|
|
func (c *CachedStore) DeleteSession(id int64) {
|
|
c.inner.DeleteSession(id)
|
|
c.rdb.Del(context.Background(), msgKey(id))
|
|
}
|
|
|
|
func (c *CachedStore) AddMessage(sessionId, senderId int64, msgType MessageType, content string) (*Message, error) {
|
|
msg, err := c.inner.AddMessage(sessionId, senderId, msgType, content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
data, _ := json.Marshal(msg)
|
|
ctx := context.Background()
|
|
key := msgKey(sessionId)
|
|
|
|
pipe := c.rdb.Pipeline()
|
|
pipe.RPush(ctx, key, data)
|
|
pipe.LTrim(ctx, key, -msgCacheLimit, -1)
|
|
pipe.Expire(ctx, key, msgCacheTTL)
|
|
pipe.Exec(ctx)
|
|
|
|
return msg, nil
|
|
}
|
|
|
|
func (c *CachedStore) GetMessages(sessionId int64, page, limit int) []*Message {
|
|
if page == 0 && limit <= msgCacheLimit {
|
|
if msgs := c.getFromCache(sessionId, limit); msgs != nil {
|
|
return msgs
|
|
}
|
|
}
|
|
|
|
msgs := c.inner.GetMessages(sessionId, page, limit)
|
|
|
|
if page == 0 && len(msgs) > 0 {
|
|
c.warmCache(sessionId, msgs)
|
|
}
|
|
|
|
return msgs
|
|
}
|
|
|
|
func (c *CachedStore) getFromCache(sessionId int64, limit int) []*Message {
|
|
ctx := context.Background()
|
|
key := msgKey(sessionId)
|
|
|
|
exists, err := c.rdb.Exists(ctx, key).Result()
|
|
if err != nil || exists == 0 {
|
|
return nil
|
|
}
|
|
|
|
vals, err := c.rdb.LRange(ctx, key, 0, int64(limit)-1).Result()
|
|
if err != nil || len(vals) == 0 {
|
|
return nil
|
|
}
|
|
|
|
msgs := make([]*Message, 0, len(vals))
|
|
for _, v := range vals {
|
|
var msg Message
|
|
if json.Unmarshal([]byte(v), &msg) == nil {
|
|
msgs = append(msgs, &msg)
|
|
}
|
|
}
|
|
return msgs
|
|
}
|
|
|
|
func (c *CachedStore) warmCache(sessionId int64, msgs []*Message) {
|
|
ctx := context.Background()
|
|
key := msgKey(sessionId)
|
|
|
|
pipe := c.rdb.Pipeline()
|
|
pipe.Del(ctx, key)
|
|
for _, msg := range msgs {
|
|
data, _ := json.Marshal(msg)
|
|
pipe.RPush(ctx, key, data)
|
|
}
|
|
pipe.LTrim(ctx, key, -msgCacheLimit, -1)
|
|
pipe.Expire(ctx, key, msgCacheTTL)
|
|
pipe.Exec(ctx)
|
|
}
|