Database and caching logic
This commit is contained in:
@@ -0,0 +1,128 @@
|
||||
package chatcore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
const (
|
||||
msgCachePrefix = "chat:msgs:"
|
||||
msgCacheLimit = 100
|
||||
msgCacheTTL = 30 * time.Minute
|
||||
)
|
||||
|
||||
type CachedStore struct {
|
||||
inner Store
|
||||
rdb redis.UniversalClient
|
||||
}
|
||||
|
||||
func NewCachedStore(inner Store, rdb redis.UniversalClient) *CachedStore {
|
||||
return &CachedStore{inner: inner, rdb: rdb}
|
||||
}
|
||||
|
||||
func msgKey(sessionId int64) string {
|
||||
return fmt.Sprintf("%s%d", msgCachePrefix, sessionId)
|
||||
}
|
||||
|
||||
func (c *CachedStore) CreateSession(typ SessionType, name string, creatorId int64, participants []int64) (*Session, error) {
|
||||
return c.inner.CreateSession(typ, name, creatorId, participants)
|
||||
}
|
||||
|
||||
func (c *CachedStore) GetSession(id int64) (*Session, error) {
|
||||
return c.inner.GetSession(id)
|
||||
}
|
||||
|
||||
func (c *CachedStore) ListUserSessions(userId int64, page, limit int) []*Session {
|
||||
return c.inner.ListUserSessions(userId, page, limit)
|
||||
}
|
||||
|
||||
func (c *CachedStore) AddParticipant(sessionId, userId int64) error {
|
||||
return c.inner.AddParticipant(sessionId, userId)
|
||||
}
|
||||
|
||||
func (c *CachedStore) RemoveParticipant(sessionId, userId int64) error {
|
||||
return c.inner.RemoveParticipant(sessionId, userId)
|
||||
}
|
||||
|
||||
func (c *CachedStore) DeleteSession(id int64) {
|
||||
c.inner.DeleteSession(id)
|
||||
c.rdb.Del(context.Background(), msgKey(id))
|
||||
}
|
||||
|
||||
func (c *CachedStore) AddMessage(sessionId, senderId int64, msgType MessageType, content string) (*Message, error) {
|
||||
msg, err := c.inner.AddMessage(sessionId, senderId, msgType, content)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data, _ := json.Marshal(msg)
|
||||
ctx := context.Background()
|
||||
key := msgKey(sessionId)
|
||||
|
||||
pipe := c.rdb.Pipeline()
|
||||
pipe.RPush(ctx, key, data)
|
||||
pipe.LTrim(ctx, key, -msgCacheLimit, -1)
|
||||
pipe.Expire(ctx, key, msgCacheTTL)
|
||||
pipe.Exec(ctx)
|
||||
|
||||
return msg, nil
|
||||
}
|
||||
|
||||
func (c *CachedStore) GetMessages(sessionId int64, page, limit int) []*Message {
|
||||
if page == 0 && limit <= msgCacheLimit {
|
||||
if msgs := c.getFromCache(sessionId, limit); msgs != nil {
|
||||
return msgs
|
||||
}
|
||||
}
|
||||
|
||||
msgs := c.inner.GetMessages(sessionId, page, limit)
|
||||
|
||||
if page == 0 && len(msgs) > 0 {
|
||||
c.warmCache(sessionId, msgs)
|
||||
}
|
||||
|
||||
return msgs
|
||||
}
|
||||
|
||||
func (c *CachedStore) getFromCache(sessionId int64, limit int) []*Message {
|
||||
ctx := context.Background()
|
||||
key := msgKey(sessionId)
|
||||
|
||||
exists, err := c.rdb.Exists(ctx, key).Result()
|
||||
if err != nil || exists == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
vals, err := c.rdb.LRange(ctx, key, 0, int64(limit)-1).Result()
|
||||
if err != nil || len(vals) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
msgs := make([]*Message, 0, len(vals))
|
||||
for _, v := range vals {
|
||||
var msg Message
|
||||
if json.Unmarshal([]byte(v), &msg) == nil {
|
||||
msgs = append(msgs, &msg)
|
||||
}
|
||||
}
|
||||
return msgs
|
||||
}
|
||||
|
||||
func (c *CachedStore) warmCache(sessionId int64, msgs []*Message) {
|
||||
ctx := context.Background()
|
||||
key := msgKey(sessionId)
|
||||
|
||||
pipe := c.rdb.Pipeline()
|
||||
pipe.Del(ctx, key)
|
||||
for _, msg := range msgs {
|
||||
data, _ := json.Marshal(msg)
|
||||
pipe.RPush(ctx, key, data)
|
||||
}
|
||||
pipe.LTrim(ctx, key, -msgCacheLimit, -1)
|
||||
pipe.Expire(ctx, key, msgCacheTTL)
|
||||
pipe.Exec(ctx)
|
||||
}
|
||||
Reference in New Issue
Block a user