add: anowflake email kafka, refa: redis connectg
This commit is contained in:
@@ -0,0 +1,96 @@
|
||||
package redisx
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/zeromicro/go-zero/core/stores/cache"
|
||||
)
|
||||
|
||||
type MasterSlaveCluster struct {
|
||||
Client *redis.ClusterClient
|
||||
MasterHost string
|
||||
SlaveHost string
|
||||
HasSlave bool
|
||||
}
|
||||
|
||||
func NewMasterSlaveCluster(cacheConf cache.CacheConf) *MasterSlaveCluster {
|
||||
cacheConf = filterCacheConf(cacheConf)
|
||||
if len(cacheConf) == 0 {
|
||||
return &MasterSlaveCluster{}
|
||||
}
|
||||
|
||||
master := cacheConf[0]
|
||||
slave := cacheConf[0]
|
||||
hasSlave := len(cacheConf) > 1
|
||||
if hasSlave {
|
||||
slave = cacheConf[1]
|
||||
}
|
||||
|
||||
client := redis.NewClusterClient(&redis.ClusterOptions{
|
||||
Addrs: []string{master.Host},
|
||||
Username: master.User,
|
||||
Password: master.Pass,
|
||||
ReadOnly: hasSlave,
|
||||
ClusterSlots: func(ctx context.Context) ([]redis.ClusterSlot, error) {
|
||||
nodes := []redis.ClusterNode{{Addr: master.Host}}
|
||||
if hasSlave {
|
||||
nodes = append(nodes, redis.ClusterNode{Addr: slave.Host})
|
||||
}
|
||||
|
||||
return []redis.ClusterSlot{{
|
||||
Start: 0,
|
||||
End: 16383,
|
||||
Nodes: nodes,
|
||||
}}, nil
|
||||
},
|
||||
})
|
||||
|
||||
return &MasterSlaveCluster{
|
||||
Client: client,
|
||||
MasterHost: master.Host,
|
||||
SlaveHost: slave.Host,
|
||||
HasSlave: hasSlave,
|
||||
}
|
||||
}
|
||||
|
||||
func ConnectMasterSlaveCluster(cacheConf cache.CacheConf, timeout time.Duration) (*MasterSlaveCluster, error) {
|
||||
cluster := NewMasterSlaveCluster(cacheConf)
|
||||
if cluster == nil || cluster.Client == nil {
|
||||
return cluster, nil
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
if err := cluster.Ping(ctx); err != nil {
|
||||
return cluster, err
|
||||
}
|
||||
|
||||
return cluster, nil
|
||||
}
|
||||
|
||||
func (m *MasterSlaveCluster) Ping(ctx context.Context) error {
|
||||
if m == nil || m.Client == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return m.Client.Ping(ctx).Err()
|
||||
}
|
||||
|
||||
func filterCacheConf(cacheConf cache.CacheConf) cache.CacheConf {
|
||||
if len(cacheConf) == 0 {
|
||||
return cacheConf
|
||||
}
|
||||
|
||||
filtered := make(cache.CacheConf, 0, len(cacheConf))
|
||||
for _, node := range cacheConf {
|
||||
if node.Host == "" {
|
||||
continue
|
||||
}
|
||||
filtered = append(filtered, node)
|
||||
}
|
||||
|
||||
return filtered
|
||||
}
|
||||
Reference in New Issue
Block a user