configurePeerListeners
是 ETCD 的一个核心函数,用于为集群中节点之间的通信配置监听器(Peer Listener)。这些监听器主要负责 Raft 协议的消息传递、日志复制等功能。函数返回一个包含所有监听器的列表。
函数签名
func configurePeerListeners(cfg *Config) (peers []*peerListener, err error)
- 输入参数:
cfg *Config
:指向Config
配置结构体的指针,包含监听器所需的所有配置信息。
- 返回值:
peers []*peerListener
:返回一个peerListener
的切片,表示为每个 Peer 配置的监听器。err error
:若配置过程中发生错误,返回详细错误信息。
1. 更新加密套件配置
if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
return nil, err
}
- 作用:更新 TLS 的加密套件(Cipher Suites)。
- 逻辑:
- 使用
cfg.CipherSuites
更新cfg.PeerTLSInfo
的加密配置。 - 如果更新失败,返回错误。
- 使用
2. 配置自签名证书
if err = cfg.PeerSelfCert(); err != nil {
cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err))
}
- 作用:为 Peer 生成自签名证书(如果未提供证书文件)。
- 逻辑:
- 调用
cfg.PeerSelfCert()
方法生成自签名证书。 - 如果生成失败,记录错误日志并终止程序。
- 调用
3. 更新 TLS 版本
updateMinMaxVersions(&cfg.PeerTLSInfo, cfg.TlsMinVersion, cfg.TlsMaxVersion)
- 作用:更新 TLS 的最小和最大版本。
- 逻辑:
- 使用配置中的
TlsMinVersion
和TlsMaxVersion
更新cfg.PeerTLSInfo
。 - 确保使用的 TLS 版本在允许范围内。
- 使用配置中的
4. 检查是否启用 TLS
if !cfg.PeerTLSInfo.Empty() {
cfg.logger.Info(
"starting with peer TLS",
zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)),
zap.Strings("cipher-suites", cfg.CipherSuites),
)
}
- 作用:检查 Peer 是否启用了 TLS。
- 逻辑:
- 如果
cfg.PeerTLSInfo
不为空,记录日志说明已启用 TLS 并打印配置信息。
- 如果
5. 初始化监听器切片
peers = make([]*peerListener, len(cfg.ListenPeerUrls))
- 作用:为每个 Peer URL 初始化对应的监听器。
- 逻辑:
- 根据
cfg.ListenPeerUrls
的长度,创建一个peerListener
的切片peers
,用于存储所有监听器。
- 根据
6. 错误处理回滚逻辑
defer func() {
if err == nil {
return
}
for i := range peers {
if peers[i] != nil && peers[i].close != nil {
cfg.logger.Warn(
"closing peer listener",
zap.String("address", cfg.ListenPeerUrls[i].String()),
zap.Error(err),
)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
peers[i].close(ctx)
cancel()
}
}
}()
- 作用:在监听器创建过程中发生错误时,关闭已经创建的监听器,清理资源。
- 逻辑:
- 如果
err != nil
,遍历peers
,调用每个监听器的close
方法,确保释放资源。 - 使用超时时间
1 秒
防止阻塞。
- 如果
7. 为每个 Peer URL 创建监听器
for i, u := range cfg.ListenPeerUrls {
if u.Scheme == "http" {
if !cfg.PeerTLSInfo.Empty() {
cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String()))
}
if cfg.PeerTLSInfo.ClientCertAuth {
cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String()))
}
}
peers[i] = &peerListener{close: func(context.Context) error { return nil }}
peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
transport.WithTLSInfo(&cfg.PeerTLSInfo),
transport.WithSocketOpts(&cfg.SocketOpts),
transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
)
if err != nil {
cfg.logger.Error("creating peer listener failed", zap.Error(err))
return nil, err
}
// once serve, overwrite with 'http.Server.Shutdown'
peers[i].close = func(context.Context) error {
return peers[i].Listener.Close()
}
}
- 主要逻辑:
- 遍历
cfg.ListenPeerUrls
,为每个 URL 创建一个监听器。 - HTTP 检查:
- 如果 URL 的协议是 HTTP 且启用了 TLS,发出警告日志。
- 如果启用了
PeerTLSInfo.ClientCertAuth
,但协议为 HTTP,也发出警告。
- 创建监听器:
- 调用
transport.NewListenerWithOpts
创建监听器。 - 配置 TLS 信息、套接字选项、连接超时时间等。
- 调用
- 错误处理:
- 如果监听器创建失败,记录错误日志并返回错误。
- 关闭逻辑:
- 设置
peerListener
的close
方法,用于在关闭监听器时释放资源。
- 设置
- 遍历
8. 返回监听器
return peers, nil
- 作用:返回配置完成的监听器切片。
- 如果没有错误,
err
为nil
,peers
包含所有配置好的监听器。
总结:
- 功能:
configurePeerListeners
函数为集群中每个 Peer URL 配置监听器,用于处理节点间通信。 - 关键点:
- 配置 TLS,包括加密套件、TLS 版本、自签名证书等。
- 创建监听器并设置关闭逻辑。
- 错误回滚机制,确保在发生错误时释放已分配的资源。
- 支持 HTTP 和 HTTPS 两种协议,同时发出适当的警告。
- 核心调用:
transport.NewListenerWithOpts
是监听器创建的核心方法,它根据配置初始化实际的监听器。