一、redis-go的封装实践-client模型
// Copyright 2020 Lingfei Kong <colin404@foxmail.com>. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package storage
import (
"context"
"crypto/tls"
"fmt"
"strconv"
"strings"
"sync/atomic"
"time"
redis "github.com/go-redis/redis/v7"
"github.com/marmotedu/errors"
uuid "github.com/satori/go.uuid"
"github.com/spf13/viper"
"github.com/marmotedu/iam/pkg/log"
)
// Config defines options for redis cluster.
type Config struct {
Host string
Port int
Addrs []string
MasterName string
Username string
Password string
Database int
MaxIdle int
MaxActive int
Timeout int
EnableCluster bool
UseSSL bool
SSLInsecureSkipVerify bool
}
// ErrRedisIsDown is returned when we can't communicate with redis.
var ErrRedisIsDown = errors.New("storage: Redis is either down or ws not configured")
var (
singlePool atomic.Value
singleCachePool atomic.Value
redisUp atomic.Value
)
var disableRedis atomic.Value
// DisableRedis very handy when testsing it allows to dynamically enable/disable talking with redisW.
func DisableRedis(ok bool) {
if ok {
redisUp.Store(false)
disableRedis.Store(true)
return
}
redisUp.Store(true)
disableRedis.Store(false)
}
func shouldConnect() bool {
if v := disableRedis.Load(); v != nil {
return !v.(bool)
}
return true
}
// Connected returns true if we are connected to redis.
func Connected() bool {
if v := redisUp.Load(); v != nil {
return v.(bool)
}
return false
}
//从缓存池中拿到redis客户端
func singleton(cache bool) redis.UniversalClient {
if cache {
v := singleCachePool.Load()
if v != nil {
return v.(redis.UniversalClient)
}
return nil
}
if v := singlePool.Load(); v != nil {
return v.(redis.UniversalClient)
}
return nil
}
//判断是否创建了client,没有则创建一个
func connectSingleton(cache bool, config *Config) bool {
if singleton(cache) == nil {
log.Debug("Connecting to redis cluster")
if cache {
singleCachePool.Store(NewRedisClusterPool(cache, config))
return true
}
singlePool.Store(NewRedisClusterPool(cache, config))
return true
}
return true
}
// RedisCluster is a storage manager that uses the redis database.
type RedisCluster struct {
KeyPrefix string
HashKeys bool
IsCache bool
}
// 测试redis是否可连接
func clusterConnectionIsOpen(cluster RedisCluster) bool {
c := singleton(cluster.IsCache)
testKey := "redis-test-" + uuid.Must(uuid.NewV4()).String()
if err := c.Set(testKey, "test", time.Second).Err(); err != nil {
log.Warnf("Error trying to set test key: %s", err.Error())
return false
}
if _, err := c.Get(testKey).Result(); err != nil {
log.Warnf("Error trying to get test key: %s", err.Error())
return false
}
return true
}
// ConnectToRedis starts a go routine that periodically tries to connect to redis.
func ConnectToRedis(ctx context.Context, config *Config) {
tick := time.NewTicker(time.Second)
defer tick.Stop()
c := []RedisCluster{
{}, {IsCache: true},
}
var ok bool
for _, v := range c {
if !connectSingleton(v.IsCache, config) {
break
}
if !clusterConnectionIsOpen(v) {
redisUp.Store(false)
break
}
ok = true
}
redisUp.Store(ok)
again: //不断检查redis连接是否正常,若redis连接不上将redisUp设为false
for {
select {
case <-ctx.Done():
return
case <-tick.C:
if !shouldConnect() {
continue
}
// 创建一个客户端
for _, v := range c {
if !connectSingleton(v.IsCache, config) {
redisUp.Store(false)
goto again
}
if !clusterConnectionIsOpen(v) {
redisUp.Store(false)
goto again
}
}
redisUp.Store(true)
}
}
}
// NewRedisClusterPool create a redis cluster pool.
func NewRedisClusterPool(isCache bool, config *Config) redis.UniversalClient {
// redisSingletonMu is locked and we know the singleton is nil
log.Debug("Creating new Redis connection pool")
// poolSize applies per cluster node and not for the whole cluster.
poolSize := 500
if config.MaxActive > 0 {
poolSize = config.MaxActive
}
timeout := 5 * time.Second
if config.Timeout > 0 {
timeout = time.Duration(config.Timeout) * time.Second
}
var tlsConfig *tls.Config
if config.UseSSL {
tlsConfig = &tls.Config{
InsecureSkipVerify: config.SSLInsecureSkipVerify,
}
}
var client redis.UniversalClient
opts := &RedisOpts{
Addrs: getRedisAddrs(config),
MasterName: config.MasterName,
Password: config.Password,
DB: config.Database,
DialTimeout: timeout,
ReadTimeout: timeout,
WriteTimeout: timeout,
IdleTimeout: 240 * timeout,
PoolSize: poolSize,
TLSConfig: tlsConfig,
}
if opts.MasterName != "" {
log.Info("--> [REDIS] Creating sentinel-backed failover client")
client = redis.NewFailoverClient(opts.failover()) //创建一个能够支持Redis Sentinel模式下主从切换的Redis客户端
} else if config.EnableCluster {
log.Info("--> [REDIS] Creating cluster client")
client = redis.NewClusterClient(opts.cluster())
} else {
log.Info("--> [REDIS] Creating single-node client")
client = redis.NewClient(opts.simple())
}
return client
}
func getRedisAddrs(config *Config) (addrs []string) {
if len(config.Addrs) != 0 {
addrs = config.Addrs
}
if len(addrs) == 0 && config.Port != 0 {
addr := config.Host + ":" + strconv.Itoa(config.Port)
addrs = append(addrs, addr)
}
return addrs
}
// RedisOpts is the overridden type of redis.UniversalOptions. simple() and cluster() functions are not public in redis
// library.
// Therefore, they are redefined in here to use in creation of new redis cluster logic.
// We don't want to use redis.NewUniversalClient() logic.
type RedisOpts redis.UniversalOptions
func (o *RedisOpts) cluster() *redis.ClusterOptions {
if len(o.Addrs) == 0 {
o.Addrs = []string{"127.0.0.1:6379"}
}
return &redis.ClusterOptions{
Addrs: o.Addrs,
OnConnect: o.OnConnect,
Password: o.Password,
MaxRedirects: o.MaxRedirects,
ReadOnly: o.ReadOnly,
RouteByLatency: o.RouteByLatency,
RouteRandomly: o.RouteRandomly,
MaxRetries: o.MaxRetries,
MinRetryBackoff: o.MinRetryBackoff,
MaxRetryBackoff: o.MaxRetryBackoff,
DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout,
PoolSize: o.PoolSize,
MinIdleConns: o.MinIdleConns,
MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
TLSConfig: o.TLSConfig,
}
}
// 创建redis单点配置
func (o *RedisOpts) simple() *redis.Options {
addr := "127.0.0.1:6379"
if len(o.Addrs) > 0 {
addr = o.Addrs[0]
}
return &redis.Options{
Addr: addr,
OnConnect: o.OnConnect,
DB: o.DB,
Password: o.Password,
MaxRetries: o.MaxRetries,
MinRetryBackoff: o.MinRetryBackoff,
MaxRetryBackoff: o.MaxRetryBackoff,
DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout,
PoolSize: o.PoolSize,
MinIdleConns: o.MinIdleConns,
MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
TLSConfig: o.TLSConfig,
}
}
//创建redis集群-从节点配置
func (o *RedisOpts) failover() *redis.FailoverOptions {
if len(o.Addrs) == 0 {
o.Addrs = []string{"127.0.0.1:26379"}
}
return &redis.FailoverOptions{
SentinelAddrs: o.Addrs,
MasterName: o.MasterName,
OnConnect: o.OnConnect,
DB: o.DB,
Password: o.Password,
MaxRetries: o.MaxRetries,
MinRetryBackoff: o.MinRetryBackoff,
MaxRetryBackoff: o.MaxRetryBackoff,
DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout,
PoolSize: o.PoolSize,
MinIdleConns: o.MinIdleConns,
MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
TLSConfig: o.TLSConfig,
}
}
// Connect will establish a connection this is always true because we are dynamically using redis.
func (r *RedisCluster) Connect() bool {
return true
}
func (r *RedisCluster) singleton() redis.UniversalClient {
return singleton(r.IsCache)
}
func (r *RedisCluster) hashKey(in string) string {
if !r.HashKeys {
// Not hashing? Return the raw key
return in
}
return HashStr(in)
}
func (r *RedisCluster) fixKey(keyName string) string {
return r.KeyPrefix + r.hashKey(keyName)
}
func (r *RedisCluster) cleanKey(keyName string) string {
return strings.Replace(keyName, r.KeyPrefix, "", 1)
}
func (r *RedisCluster) up() error {
if !Connected() {
return ErrRedisIsDown
}
return nil
}
// GetKey will retrieve a key from the database.
func (r *RedisCluster) GetKey(keyName string) (string, error) {
if err := r.up(); err != nil {
return "", err
}
cluster := r.singleton()
value, err := cluster.Get(r.fixKey(keyName)).Result()
if err != nil {
log.Debugf("Error trying to get value: %s", err.Error())
return "", ErrKeyNotFound
}
return value, nil
}
// GetMultiKey gets multiple keys from the database.
func (r *RedisCluster) GetMultiKey(keys []string) ([]string, error) {
if err := r.up(); err != nil {
return nil, err
}
cluster := r.singleton()
keyNames := make([]string, len(keys))
copy(keyNames, keys)
for index, val := range keyNames {
keyNames[index] = r.fixKey(val)
}
result := make([]string, 0)
switch v := cluster.(type) {
case *redis.ClusterClient:
{
getCmds := make([]*redis.StringCmd, 0)
pipe := v.Pipeline()
for _, key := range keyNames {
getCmds = append(getCmds, pipe.Get(key))
}
_, err := pipe.Exec()
if err != nil && !errors.Is(err, redis.Nil) {
log.Debugf("Error trying to get value: %s", err.Error())
return nil, ErrKeyNotFound
}
for _, cmd := range getCmds {
result = append(result, cmd.Val())
}
}
case *redis.Client:
{
values, err := cluster.MGet(keyNames...).Result()
if err != nil {
log.Debugf("Error trying to get value: %s", err.Error())
return nil, ErrKeyNotFound
}
for _, val := range values {
strVal := fmt.Sprint(val)
if strVal == "<nil>" {
strVal = ""
}
result = append(result, strVal)
}
}
}
for _, val := range result {
if val != "" {
return result, nil
}
}
return nil, ErrKeyNotFound
}
// GetKeyTTL return ttl of the given key.
func (r *RedisCluster) GetKeyTTL(keyName string) (ttl int64, err error) {
if err = r.up(); err != nil {
return 0, err
}
duration, err := r.singleton().TTL(r.fixKey(keyName)).Result()
return int64(duration.Seconds()), err
}
// GetRawKey return the value of the given key.
func (r *RedisCluster) GetRawKey(keyName string) (string, error) {
if err := r.up(); err != nil {
return "", err
}
value, err := r.singleton().Get(keyName).Result()
if err != nil {
log.Debugf("Error trying to get value: %s", err.Error())
return "", ErrKeyNotFound
}
return value, nil
}
// GetExp return the expiry of the given key.
func (r *RedisCluster) GetExp(keyName string) (int64, error) {
log.Debugf("Getting exp for key: %s", r.fixKey(keyName))
if err := r.up(); err != nil {
return 0, err
}
value, err := r.singleton().TTL(r.fixKey(keyName)).Result()
if err != nil {
log.Errorf("Error trying to get TTL: ", err.Error())
return 0, ErrKeyNotFound
}
return int64(value.Seconds()), nil
}
// SetExp set expiry of the given key.
func (r *RedisCluster) SetExp(keyName string, timeout time.Duration) error {
if err := r.up(); err != nil {
return err
}
err := r.singleton().Expire(r.fixKey(keyName), timeout).Err()
if err != nil {
log.Errorf("Could not EXPIRE key: %s", err.Error())
}
return err
}
// SetKey will create (or update) a key value in the store.
func (r *RedisCluster) SetKey(keyName, session string, timeout time.Duration) error {
log.Debugf("[STORE] SET Raw key is: %s", keyName)
log.Debugf("[STORE] Setting key: %s", r.fixKey(keyName))
if err := r.up(); err != nil {
return err
}
err := r.singleton().Set(r.fixKey(keyName), session, timeout).Err()
if err != nil {
log.Errorf("Error trying to set value: %s", err.Error())
return err
}
return nil
}
// SetRawKey set the value of the given key.
func (r *RedisCluster) SetRawKey(keyName, session string, timeout time.Duration) error {
if err := r.up(); err != nil {
return err
}
err := r.singleton().Set(keyName, session, timeout).Err()
if err != nil {
log.Errorf("Error trying to set value: %s", err.Error())
return err
}
return nil
}
// Decrement will decrement a key in redis.
func (r *RedisCluster) Decrement(keyName string) {
keyName = r.fixKey(keyName)
log.Debugf("Decrementing key: %s", keyName)
if err := r.up(); err != nil {
return
}
err := r.singleton().Decr(keyName).Err()
if err != nil {
log.Errorf("Error trying to decrement value: %s", err.Error())
}
}
// IncrememntWithExpire will increment a key in redis.
func (r *RedisCluster) IncrememntWithExpire(keyName string, expire int64) int64 {
log.Debugf("Incrementing raw key: %s", keyName)
if err := r.up(); err != nil {
return 0
}
// This function uses a raw key, so we shouldn't call fixKey
fixedKey := keyName
val, err := r.singleton().Incr(fixedKey).Result()
if err != nil {
log.Errorf("Error trying to increment value: %s", err.Error())
} else {
log.Debugf("Incremented key: %s, val is: %d", fixedKey, val)
}
if val == 1 && expire > 0 {
log.Debug("--> Setting Expire")
r.singleton().Expire(fixedKey, time.Duration(expire)*time.Second)
}
return val
}
// GetKeys will return all keys according to the filter (filter is a prefix - e.g. tyk.keys.*).
func (r *RedisCluster) GetKeys(filter string) []string {
if err := r.up(); err != nil {
return nil
}
client := r.singleton()
filterHash := ""
if filter != "" {
filterHash = r.hashKey(filter)
}
searchStr := r.KeyPrefix + filterHash + "*"
log.Debugf("[STORE] Getting list by: %s", searchStr)
fnFetchKeys := func(client *redis.Client) ([]string, error) {
values := make([]string, 0)
iter := client.Scan(0, searchStr, 0).Iterator()
for iter.Next() {
values = append(values, iter.Val())
}
if err := iter.Err(); err != nil {
return nil, err
}
return values, nil
}
var err error
var values []string
sessions := make([]string, 0)
switch v := client.(type) {
case *redis.ClusterClient:
ch := make(chan []string)
go func() {
err = v.ForEachMaster(func(client *redis.Client) error {
values, err = fnFetchKeys(client)
if err != nil {
return err
}
ch <- values
return nil
})
close(ch)
}()
for res := range ch {
sessions = append(sessions, res...)
}
case *redis.Client:
sessions, err = fnFetchKeys(v)
}
if err != nil {
log.Errorf("Error while fetching keys: %s", err)
return nil
}
for i, v := range sessions {
sessions[i] = r.cleanKey(v)
}
return sessions
}
// GetKeysAndValuesWithFilter will return all keys and their values with a filter.
func (r *RedisCluster) GetKeysAndValuesWithFilter(filter string) map[string]string {
if err := r.up(); err != nil {
return nil
}
keys := r.GetKeys(filter)
if keys == nil {
log.Error("Error trying to get filtered client keys")
return nil
}
if len(keys) == 0 {
return nil
}
for i, v := range keys {
keys[i] = r.KeyPrefix + v
}
client := r.singleton()
values := make([]string, 0)
switch v := client.(type) {
case *redis.ClusterClient:
{
getCmds := make([]*redis.StringCmd, 0)
pipe := v.Pipeline()
for _, key := range keys {
getCmds = append(getCmds, pipe.Get(key))
}
_, err := pipe.Exec()
if err != nil && !errors.Is(err, redis.Nil) {
log.Errorf("Error trying to get client keys: %s", err.Error())
return nil
}
for _, cmd := range getCmds {
values = append(values, cmd.Val())
}
}
case *redis.Client:
{
result, err := v.MGet(keys...).Result()
if err != nil {
log.Errorf("Error trying to get client keys: %s", err.Error())
return nil
}
for _, val := range result {
strVal := fmt.Sprint(val)
if strVal == "<nil>" {
strVal = ""
}
values = append(values, strVal)
}
}
}
m := make(map[string]string)
for i, v := range keys {
m[r.cleanKey(v)] = values[i]
}
return m
}
// GetKeysAndValues will return all keys and their values - not to be used lightly.
func (r *RedisCluster) GetKeysAndValues() map[string]string {
return r.GetKeysAndValuesWithFilter("")
}
// DeleteKey will remove a key from the database.
func (r *RedisCluster) DeleteKey(keyName string) bool {
if err := r.up(); err != nil {
// log.Debug(err)
return false
}
log.Debugf("DEL Key was: %s", keyName)
log.Debugf("DEL Key became: %s", r.fixKey(keyName))
n, err := r.singleton().Del(r.fixKey(keyName)).Result()
if err != nil {
log.Errorf("Error trying to delete key: %s", err.Error())
}
return n > 0
}
// DeleteAllKeys will remove all keys from the database.
func (r *RedisCluster) DeleteAllKeys() bool {
if err := r.up(); err != nil {
return false
}
n, err := r.singleton().FlushAll().Result()
if err != nil {
log.Errorf("Error trying to delete keys: %s", err.Error())
}
if n == "OK" {
return true
}
return false
}
// DeleteRawKey will remove a key from the database without prefixing, assumes user knows what they are doing.
func (r *RedisCluster) DeleteRawKey(keyName string) bool {
if err := r.up(); err != nil {
return false
}
n, err := r.singleton().Del(keyName).Result()
if err != nil {
log.Errorf("Error trying to delete key: %s", err.Error())
}
return n > 0
}
// DeleteScanMatch will remove a group of keys in bulk.
func (r *RedisCluster) DeleteScanMatch(pattern string) bool {
if err := r.up(); err != nil {
return false
}
client := r.singleton()
log.Debugf("Deleting: %s", pattern)
fnScan := func(client *redis.Client) ([]string, error) {
values := make([]string, 0)
iter := client.Scan(0, pattern, 0).Iterator()
for iter.Next() {
values = append(values, iter.Val())
}
if err := iter.Err(); err != nil {
return nil, err
}
return values, nil
}
var err error
var keys []string
var values []string
switch v := client.(type) {
case *redis.ClusterClient:
ch := make(chan []string)
go func() {
err = v.ForEachMaster(func(client *redis.Client) error {
values, err = fnScan(client)
if err != nil {
return err
}
ch <- values
return nil
})
close(ch)
}()
for vals := range ch {
keys = append(keys, vals...)
}
case *redis.Client:
keys, err = fnScan(v)
}
if err != nil {
log.Errorf("SCAN command field with err: %s", err.Error())
return false
}
if len(keys) > 0 {
for _, name := range keys {
log.Infof("Deleting: %s", name)
err := client.Del(name).Err()
if err != nil {
log.Errorf("Error trying to delete key: %s - %s", name, err.Error())
}
}
log.Infof("Deleted: %d records", len(keys))
} else {
log.Debug("RedisCluster called DEL - Nothing to delete")
}
return true
}
// DeleteKeys will remove a group of keys in bulk.
func (r *RedisCluster) DeleteKeys(keys []string) bool {
if err := r.up(); err != nil {
return false
}
if len(keys) > 0 {
for i, v := range keys {
keys[i] = r.fixKey(v)
}
log.Debugf("Deleting: %v", keys)
client := r.singleton()
switch v := client.(type) {
case *redis.ClusterClient:
{
pipe := v.Pipeline()
for _, k := range keys {
pipe.Del(k)
}
if _, err := pipe.Exec(); err != nil {
log.Errorf("Error trying to delete keys: %s", err.Error())
}
}
case *redis.Client:
{
_, err := v.Del(keys...).Result()
if err != nil {
log.Errorf("Error trying to delete keys: %s", err.Error())
}
}
}
} else {
log.Debug("RedisCluster called DEL - Nothing to delete")
}
return true
}
// StartPubSubHandler will listen for a signal and run the callback for
// every subscription and message event.
func (r *RedisCluster) StartPubSubHandler(channel string, callback func(interface{})) error {
if err := r.up(); err != nil {
return err
}
client := r.singleton()
if client == nil {
return errors.New("redis connection failed")
}
pubsub := client.Subscribe(channel)
defer pubsub.Close()
if _, err := pubsub.Receive(); err != nil {
log.Errorf("Error while receiving pubsub message: %s", err.Error())
return err
}
for msg := range pubsub.Channel() {
callback(msg)
}
return nil
}
// Publish publish a message to the specify channel.
func (r *RedisCluster) Publish(channel, message string) error {
if err := r.up(); err != nil {
return err
}
err := r.singleton().Publish(channel, message).Err()
if err != nil {
log.Errorf("Error trying to set value: %s", err.Error())
return err
}
return nil
}
// GetAndDeleteSet get and delete a key.
func (r *RedisCluster) GetAndDeleteSet(keyName string) []interface{} {
log.Debugf("Getting raw key set: %s", keyName)
if err := r.up(); err != nil {
return nil
}
log.Debugf("keyName is: %s", keyName)
fixedKey := r.fixKey(keyName)
log.Debugf("Fixed keyname is: %s", fixedKey)
client := r.singleton()
var lrange *redis.StringSliceCmd
_, err := client.TxPipelined(func(pipe redis.Pipeliner) error {
lrange = pipe.LRange(fixedKey, 0, -1)
pipe.Del(fixedKey)
return nil
})
if err != nil {
log.Errorf("Multi command failed: %s", err.Error())
return nil
}
vals := lrange.Val()
log.Debugf("Analytics returned: %d", len(vals))
if len(vals) == 0 {
return nil
}
log.Debugf("Unpacked vals: %d", len(vals))
result := make([]interface{}, len(vals))
for i, v := range vals {
result[i] = v
}
return result
}
// AppendToSet append a value to the key set.
func (r *RedisCluster) AppendToSet(keyName, value string) {
fixedKey := r.fixKey(keyName)
log.Debug("Pushing to raw key list", log.String("keyName", keyName))
log.Debug("Appending to fixed key list", log.String("fixedKey", fixedKey))
if err := r.up(); err != nil {
return
}
if err := r.singleton().RPush(fixedKey, value).Err(); err != nil {
log.Errorf("Error trying to append to set keys: %s", err.Error())
}
}
// Exists check if keyName exists.
func (r *RedisCluster) Exists(keyName string) (bool, error) {
fixedKey := r.fixKey(keyName)
log.Debug("Checking if exists", log.String("keyName", fixedKey))
exists, err := r.singleton().Exists(fixedKey).Result()
if err != nil {
log.Errorf("Error trying to check if key exists: %s", err.Error())
return false, err
}
if exists == 1 {
return true, nil
}
return false, nil
}
// RemoveFromList delete an value from a list idetinfied with the keyName.
func (r *RedisCluster) RemoveFromList(keyName, value string) error {
fixedKey := r.fixKey(keyName)
log.Debug(
"Removing value from list",
log.String("keyName", keyName),
log.String("fixedKey", fixedKey),
log.String("value", value),
)
if err := r.singleton().LRem(fixedKey, 0, value).Err(); err != nil {
log.Error(
"LREM command failed",
log.String("keyName", keyName),
log.String("fixedKey", fixedKey),
log.String("value", value),
log.String("error", err.Error()),
)
return err
}
return nil
}
// GetListRange gets range of elements of list identified by keyName.
func (r *RedisCluster) GetListRange(keyName string, from, to int64) ([]string, error) {
fixedKey := r.fixKey(keyName)
elements, err := r.singleton().LRange(fixedKey, from, to).Result()
if err != nil {
log.Error(
"LRANGE command failed",
log.String(
"keyName",
keyName,
),
log.String("fixedKey", fixedKey),
log.Int64("from", from),
log.Int64("to", to),
log.String("error", err.Error()),
)
return nil, err
}
return elements, nil
}
// AppendToSetPipelined append values to redis pipeline.
func (r *RedisCluster) AppendToSetPipelined(key string, values [][]byte) {
if len(values) == 0 {
return
}
fixedKey := r.fixKey(key)
if err := r.up(); err != nil {
log.Debug(err.Error())
return
}
client := r.singleton()
pipe := client.Pipeline()
for _, val := range values {
pipe.RPush(fixedKey, val)
}
if _, err := pipe.Exec(); err != nil {
log.Errorf("Error trying to append to set keys: %s", err.Error())
}
// if we need to set an expiration time
if storageExpTime := int64(viper.GetDuration("analytics.storage-expiration-time")); storageExpTime != int64(-1) {
// If there is no expiry on the analytics set, we should set it.
exp, _ := r.GetExp(key)
if exp == -1 {
_ = r.SetExp(key, time.Duration(storageExpTime)*time.Second)
}
}
}
// GetSet return key set value.
func (r *RedisCluster) GetSet(keyName string) (map[string]string, error) {
log.Debugf("Getting from key set: %s", keyName)
log.Debugf("Getting from fixed key set: %s", r.fixKey(keyName))
if err := r.up(); err != nil {
return nil, err
}
val, err := r.singleton().SMembers(r.fixKey(keyName)).Result()
if err != nil {
log.Errorf("Error trying to get key set: %s", err.Error())
return nil, err
}
result := make(map[string]string)
for i, value := range val {
result[strconv.Itoa(i)] = value
}
return result, nil
}
// AddToSet add value to key set.
func (r *RedisCluster) AddToSet(keyName, value string) {
log.Debugf("Pushing to raw key set: %s", keyName)
log.Debugf("Pushing to fixed key set: %s", r.fixKey(keyName))
if err := r.up(); err != nil {
return
}
err := r.singleton().SAdd(r.fixKey(keyName), value).Err()
if err != nil {
log.Errorf("Error trying to append keys: %s", err.Error())
}
}
// RemoveFromSet remove a value from key set.
func (r *RedisCluster) RemoveFromSet(keyName, value string) {
log.Debugf("Removing from raw key set: %s", keyName)
log.Debugf("Removing from fixed key set: %s", r.fixKey(keyName))
if err := r.up(); err != nil {
log.Debug(err.Error())
return
}
err := r.singleton().SRem(r.fixKey(keyName), value).Err()
if err != nil {
log.Errorf("Error trying to remove keys: %s", err.Error())
}
}
// IsMemberOfSet return whether the given value belong to key set.
func (r *RedisCluster) IsMemberOfSet(keyName, value string) bool {
if err := r.up(); err != nil {
log.Debug(err.Error())
return false
}
val, err := r.singleton().SIsMember(r.fixKey(keyName), value).Result()
if err != nil {
log.Errorf("Error trying to check set member: %s", err.Error())
return false
}
log.Debugf("SISMEMBER %s %s %v %v", keyName, value, val, err)
return val
}
// SetRollingWindow will append to a sorted set in redis and extract a timed window of values.
func (r *RedisCluster) SetRollingWindow(
keyName string,
per int64,
valueOverride string,
pipeline bool,
) (int, []interface{}) {
log.Debugf("Incrementing raw key: %s", keyName)
if err := r.up(); err != nil {
log.Debug(err.Error())
return 0, nil
}
log.Debugf("keyName is: %s", keyName)
now := time.Now()
log.Debugf("Now is: %v", now)
onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)
log.Debugf("Then is: %v", onePeriodAgo)
client := r.singleton()
var zrange *redis.StringSliceCmd
pipeFn := func(pipe redis.Pipeliner) error {
pipe.ZRemRangeByScore(keyName, "-inf", strconv.Itoa(int(onePeriodAgo.UnixNano())))
zrange = pipe.ZRange(keyName, 0, -1)
element := redis.Z{
Score: float64(now.UnixNano()),
}
if valueOverride != "-1" {
element.Member = valueOverride
} else {
element.Member = strconv.Itoa(int(now.UnixNano()))
}
pipe.ZAdd(keyName, &element)
pipe.Expire(keyName, time.Duration(per)*time.Second)
return nil
}
var err error
if pipeline {
_, err = client.Pipelined(pipeFn)
} else {
_, err = client.TxPipelined(pipeFn)
}
if err != nil {
log.Errorf("Multi command failed: %s", err.Error())
return 0, nil
}
values := zrange.Val()
// Check actual value
if values == nil {
return 0, nil
}
intVal := len(values)
result := make([]interface{}, len(values))
for i, v := range values {
result[i] = v
}
log.Debugf("Returned: %d", intVal)
return intVal, result
}
// GetRollingWindow return rolling window.
func (r RedisCluster) GetRollingWindow(keyName string, per int64, pipeline bool) (int, []interface{}) {
if err := r.up(); err != nil {
log.Debug(err.Error())
return 0, nil
}
now := time.Now()
onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)
client := r.singleton()
var zrange *redis.StringSliceCmd
pipeFn := func(pipe redis.Pipeliner) error {
pipe.ZRemRangeByScore(keyName, "-inf", strconv.Itoa(int(onePeriodAgo.UnixNano())))
zrange = pipe.ZRange(keyName, 0, -1)
return nil
}
var err error
if pipeline {
_, err = client.Pipelined(pipeFn)
} else {
_, err = client.TxPipelined(pipeFn)
}
if err != nil {
log.Errorf("Multi command failed: %s", err.Error())
return 0, nil
}
values := zrange.Val()
// Check actual value
if values == nil {
return 0, nil
}
intVal := len(values)
result := make([]interface{}, intVal)
for i, v := range values {
result[i] = v
}
log.Debugf("Returned: %d", intVal)
return intVal, result
}
// GetKeyPrefix returns storage key prefix.
func (r *RedisCluster) GetKeyPrefix() string {
return r.KeyPrefix
}
// AddToSortedSet adds value with given score to sorted set identified by keyName.
func (r *RedisCluster) AddToSortedSet(keyName, value string, score float64) {
fixedKey := r.fixKey(keyName)
log.Debug("Pushing raw key to sorted set", log.String("keyName", keyName), log.String("fixedKey", fixedKey))
if err := r.up(); err != nil {
log.Debug(err.Error())
return
}
member := redis.Z{Score: score, Member: value}
if err := r.singleton().ZAdd(fixedKey, &member).Err(); err != nil {
log.Error(
"ZADD command failed",
log.String("keyName", keyName),
log.String("fixedKey", fixedKey),
log.String("error", err.Error()),
)
}
}
// GetSortedSetRange gets range of elements of sorted set identified by keyName.
func (r *RedisCluster) GetSortedSetRange(keyName, scoreFrom, scoreTo string) ([]string, []float64, error) {
fixedKey := r.fixKey(keyName)
log.Debug(
"Getting sorted set range",
log.String(
"keyName",
keyName,
),
log.String("fixedKey", fixedKey),
log.String("scoreFrom", scoreFrom),
log.String("scoreTo", scoreTo),
)
args := redis.ZRangeBy{Min: scoreFrom, Max: scoreTo}
values, err := r.singleton().ZRangeByScoreWithScores(fixedKey, &args).Result()
if err != nil {
log.Error(
"ZRANGEBYSCORE command failed",
log.String(
"keyName",
keyName,
),
log.String("fixedKey", fixedKey),
log.String("scoreFrom", scoreFrom),
log.String("scoreTo", scoreTo),
log.String("error", err.Error()),
)
return nil, nil, err
}
if len(values) == 0 {
return nil, nil, nil
}
elements := make([]string, len(values))
scores := make([]float64, len(values))
for i, v := range values {
elements[i] = fmt.Sprint(v.Member)
scores[i] = v.Score
}
return elements, scores, nil
}
// RemoveSortedSetRange removes range of elements from sorted set identified by keyName.
func (r *RedisCluster) RemoveSortedSetRange(keyName, scoreFrom, scoreTo string) error {
fixedKey := r.fixKey(keyName)
log.Debug(
"Removing sorted set range",
log.String(
"keyName",
keyName,
),
log.String("fixedKey", fixedKey),
log.String("scoreFrom", scoreFrom),
log.String("scoreTo", scoreTo),
)
if err := r.singleton().ZRemRangeByScore(fixedKey, scoreFrom, scoreTo).Err(); err != nil {
log.Debug(
"ZREMRANGEBYSCORE command failed",
log.String("keyName", keyName),
log.String("fixedKey", fixedKey),
log.String("scoreFrom", scoreFrom),
log.String("scoreTo", scoreTo),
log.String("error", err.Error()),
)
return err
}
return nil
}
二、批量导出日志的实践——导出器模型
工作核心
用户设置导出器的缓冲区大小以及导出间隔时间,当大小到达阈值或者达到间隔时间将数据导出到指定存储中间件
为了加快导出器的效率不至于让它的缓冲通道很快就满,因此内部开启多个worker并行工作,对channel中的消息进行消费
代码逻辑
代码
package analytics
import (
"sync"
"sync/atomic"
"time"
"github.com/vmihailenco/msgpack/v5"
"github.com/marmotedu/iam/pkg/log"
"github.com/marmotedu/iam/pkg/storage"
)
const analyticsKeyName = "iam-system-analytics"
const (
recordsBufferForcedFlushInterval = 1 * time.Second
)
// AnalyticsRecord encodes the details of a authorization request.
type AnalyticsRecord struct {
TimeStamp int64 `json:"timestamp"`
Username string `json:"username"`
Effect string `json:"effect"`
Conclusion string `json:"conclusion"`
Request string `json:"request"`
Policies string `json:"policies"`
Deciders string `json:"deciders"`
ExpireAt time.Time `json:"expireAt" bson:"expireAt"`
}
var analytics *Analytics
// SetExpiry set expiration time to a key.
func (a *AnalyticsRecord) SetExpiry(expiresInSeconds int64) {
expiry := time.Duration(expiresInSeconds) * time.Second
if expiresInSeconds == 0 {
// Expiry is set to 100 years
expiry = 24 * 365 * 100 * time.Hour
}
t := time.Now()
t2 := t.Add(expiry)
a.ExpireAt = t2
}
// Analytics will record analytics data to a redis back end as defined in the Config object.
type Analytics struct {
store storage.AnalyticsHandler
poolSize int
recordsChan chan *AnalyticsRecord
workerBufferSize uint64
recordsBufferFlushInterval uint64
shouldStop uint32
poolWg sync.WaitGroup
}
// NewAnalytics returns a new analytics instance.
func NewAnalytics(options *AnalyticsOptions, store storage.AnalyticsHandler) *Analytics {
ps := options.PoolSize
recordsBufferSize := options.RecordsBufferSize
workerBufferSize := recordsBufferSize / uint64(ps)
log.Debug("Analytics pool worker buffer size", log.Uint64("workerBufferSize", workerBufferSize))
recordsChan := make(chan *AnalyticsRecord, recordsBufferSize)
analytics = &Analytics{
store: store,
poolSize: ps,
recordsChan: recordsChan,
workerBufferSize: workerBufferSize,
recordsBufferFlushInterval: options.FlushInterval,
}
return analytics
}
// GetAnalytics returns the existed analytics instance.
// Need to initialize `analytics` instance before calling GetAnalytics.
func GetAnalytics() *Analytics {
return analytics
}
// Start start the analytics service.
func (r *Analytics) Start() {
r.store.Connect()
// start worker pool
atomic.SwapUint32(&r.shouldStop, 0)
for i := 0; i < r.poolSize; i++ {
r.poolWg.Add(1)
go r.recordWorker()
}
}
// 保证starter的优雅关闭要做到三点:
//1. 通过shouldStop变量控制不能再有新的数据写入
//2. 关闭channel,通知所有协程处理好剩余数据
//3. 通过waitGroup等待现有数据处理结束后再返回
func (r *Analytics) Stop() {
// flag to stop sending records into channel
atomic.SwapUint32(&r.shouldStop, 1)
// close channel to stop workers
close(r.recordsChan)
// wait for all workers to be done
r.poolWg.Wait()
}
// RecordHit will store an AnalyticsRecord in Redis.
func (r *Analytics) RecordHit(record *AnalyticsRecord) error {
// check if we should stop sending records 1st
if atomic.LoadUint32(&r.shouldStop) > 0 {
return nil
}
// just send record to channel consumed by pool of workers
// leave all data crunching and Redis I/O work for pool workers
r.recordsChan <- record
return nil
}
func (r *Analytics) recordWorker() {
defer r.poolWg.Done()
// this is buffer to send one pipelined command to redis
// use r.recordsBufferSize as cap to reduce slice re-allocations
recordsBuffer := make([][]byte, 0, r.workerBufferSize)
// read records from channel and process
lastSentTS := time.Now()
for {
var readyToSend bool
select {
case record, ok := <-r.recordsChan:
// check if channel was closed and it is time to exit from worker
if !ok {
// send what is left in buffer
r.store.AppendToSetPipelined(analyticsKeyName, recordsBuffer)
return
}
// we have new record - prepare it and add to buffer
if encoded, err := msgpack.Marshal(record); err != nil {
log.Errorf("Error encoding analytics data: %s", err.Error())
} else {
recordsBuffer = append(recordsBuffer, encoded)
}
// identify that buffer is ready to be sent
readyToSend = uint64(len(recordsBuffer)) == r.workerBufferSize
case <-time.After(time.Duration(r.recordsBufferFlushInterval) * time.Millisecond):
// nothing was received for that period of time
// anyways send whatever we have, don't hold data too long in buffer
readyToSend = true
}
// send data to Redis and reset buffer
if len(recordsBuffer) > 0 && (readyToSend || time.Since(lastSentTS) >= recordsBufferForcedFlushInterval) {
r.store.AppendToSetPipelined(analyticsKeyName, recordsBuffer)
recordsBuffer = recordsBuffer[:0]
lastSentTS = time.Now()
}
}
}
// DurationToMillisecond convert time duration type to float64.
func DurationToMillisecond(d time.Duration) float64 {
return float64(d) / 1e6
}