1. 如何选择节点
1.1. 确定slot
1.1.1. 通过cmdSlot
方法确定在哪个槽上, 这一步只是本地计算
首先入口方法_process,先通过cmdSlot
方法用key计算此次应该落在哪个槽上
通过
crc16sum
算法计算key应该属于哪个槽,slotNumber为16384
func Slot(key string) int {
if key == "" {
return RandomSlot()
}
key = Key(key)
return int(crc16sum(key)) % slotNumber
}
1.2. 选取节点的核心方法
1.2.1. cmdNode内部实现
1.2.2. slotReadOnlyNode方法实现
func (c *ClusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
if c.opt.RouteByLatency {
return state.slotClosestNode(slot)
}
if c.opt.RouteRandomly {
return state.slotRandomNode(slot)
}
return state.slotSlaveNode(slot)
}
2. 如何获取所有节点
2.1. c.state.Get(ctx)方法,获取所有节点
/*
如果之前没有获取过状态信息,调用Reload方法来重新加载状态信息
如果之前已经获取过状态信息,并且距离上次获取状态信息的时间超过10秒,那么会进行异步重新加载
*/
func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) {
v := c.state.Load()
if v == nil {
return c.Reload(ctx)
}
state := v.(*clusterState)
if time.Since(state.createdAt) > 10*time.Second {
c.LazyReload()
}
return state, nil
}
2.1.1. 第一次时,调用reload
2.1.1.1. reload方法
reload方法内,核心方法为c.load(ctx)
, 而load方法,为初始化时通过newClusterStateHolder
设置进来的fn
2.1.1.2. fn如下,为核心获取所有节点的c.loadState
方法
2.1.2. 后续调用LazyReload
func (c *clusterStateHolder) LazyReload() {
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
return
}
go func() {
defer atomic.StoreUint32(&c.reloading, 0)
_, err := c.Reload(context.Background())
if err != nil {
return
}
time.Sleep(200 * time.Millisecond)
}()
}
LazyReload
内部还是调用c.Reload
方法
2.2. 核心方法:获取所有节点loadState
方法实现
2.2.1. 核心为通过node.Client.ClusterSlots(ctx)
命令查询所有节点信息
// zhmark 2024/7/3 核心获取所有节点信息的方法
func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
if c.opt.ClusterSlots != nil {
slots, err := c.opt.ClusterSlots(ctx)
if err != nil {
return nil, err
}
return newClusterState(c.nodes, slots, "")
}
addrs, err := c.nodes.Addrs()
if err != nil {
return nil, err
}
var firstErr error
// 从配置的addrs里,随机选择一个节点,执行查询所有节点
for _, idx := range rand.Perm(len(addrs)) {
addr := addrs[idx]
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
slots, err := node.Client.ClusterSlots(ctx).Result()
if err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
return newClusterState(c.nodes, slots, node.Client.opt.Addr)
}
/*
* No node is connectable. It's possible that all nodes' IP has changed.
* Clear activeAddrs to let client be able to re-connect using the initial
* setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]),
* which might have chance to resolve domain name and get updated IP address.
*/
c.nodes.mu.Lock()
c.nodes.activeAddrs = nil
c.nodes.mu.Unlock()
return nil, firstErr
}
通过调用cluster slots
获取所有的节点信息
func (c cmdable) ClusterSlots(ctx context.Context) *ClusterSlotsCmd {
cmd := NewClusterSlotsCmd(ctx, "cluster", "slots")
_ = c(ctx, cmd)
return cmd
}