启动流程的图如下:
1、主函数入口
ETCD 启动的入口在 etcd/server/main.go
文件中。
package main
import (
"os"
"go.etcd.io/etcd/server/v3/etcdmain"
)
func main() {
etcdmain.Main(os.Args)
}
这里调用了 etcdmain.Main()
,这是 ETCD 的实际启动逻辑。
2、etcdmain.Main(os.Args)
详解
代码源文件:etcd/server/v3/etcdmain.main.go
func Main(args []string) {
// 1. 检查系统架构支持
checkSupportArch()
if len(args) > 1 { // // 2. 检查命令行参数
cmd := args[1]
switch cmd {
case "gateway", "grpc-proxy": // // 3. 判断是否运行网关或 gRPC 代理模式
if err := rootCmd.Execute(); err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
return
}
}
startEtcdOrProxyV2(args) // // 4. 启动普通 ETCD 节点或代理节点
}
2.1.checkSupportArch()
- 功能:检查当前运行的系统架构是否被 ETCD 支持。
- 目的:确保程序在不支持的架构上不会运行,例如特定 ARM 版本可能不被支持。
2.2. 检查命令行参数:
args
是启动时传递的命令行参数,args[0]
通常是可执行文件名,args[1]
是实际命令。
2.3. 启动不同模式:
- 如果命令为
"gateway"
或"grpc-proxy"
,表示要运行 ETCD 的网关或 gRPC 代理模式。 - 执行
rootCmd.Execute()
,启动对应的命令逻辑。 rootCmd
是 Cobra 框架定义的根命令,包含所有子命令和配置。
2.4. 启动普通节点或代理节点:
-
如果没有匹配到特殊命令,调用
startEtcdOrProxyV2(args)
。 -
功能:根据配置和参数,决定是启动标准 ETCD 节点还是运行代理模式。
接下来可以深入分析
startEtcdOrProxyV2
函数,该函数负责实际启动 ETCD 实例或代理模式。它会加载配置、初始化组件,并启动 Raft、MVCC 等核心模块。
3、startEtcdOrProxyV2(args)
详解
func startEtcdOrProxyV2(args []string) {
// 禁用 gRPC 跟踪,优化性能
grpc.EnableTracing = false
// 创建一个新的配置对象
cfg := newConfig()
// 保存初始集群配置,方便后续处理
defaultInitialCluster := cfg.ec.InitialCluster
// 解析传入的命令行参数,将配置赋值给 cfg
err := cfg.parse(args[1:])
// 初始化日志记录器,确保所有输出都写入日志。
lg := cfg.ec.GetLogger()
// If we failed to parse the whole configuration, print the error using
// preferably the resolved logger from the config,
// but if does not exists, create a new temporary logger.
if lg == nil {
var zapError error
// use this logger
lg, zapError = logutil.CreateDefaultZapLogger(zap.InfoLevel)
if zapError != nil {
fmt.Printf("error creating zap logger %v", zapError)
os.Exit(1)
}
}
lg.Info("Running: ", zap.Strings("args", args))
if err != nil {
lg.Warn("failed to verify flags", zap.Error(err))
switch {
case errorspkg.Is(err, embed.ErrUnsetAdvertiseClientURLsFlag):
lg.Warn("advertise client URLs are not set", zap.Error(err))
}
os.Exit(1)
}
cfg.ec.SetupGlobalLoggers()
defer func() {
logger := cfg.ec.GetLogger()
if logger != nil {
logger.Sync()
}
}()
// 解析默认集群地址
defaultHost, dhErr := (&cfg.ec).UpdateDefaultClusterFromName(defaultInitialCluster)
if defaultHost != "" {
lg.Info(
"detected default host for advertise",
zap.String("host", defaultHost),
)
}
if dhErr != nil {
lg.Info("failed to detect default host", zap.Error(dhErr))
}
// 设置数据目录 如果未指定数据目录,使用默认目录
if cfg.ec.Dir == "" {
cfg.ec.Dir = fmt.Sprintf("%v.etcd", cfg.ec.Name)
lg.Warn(
"'data-dir' was empty; using default",
zap.String("data-dir", cfg.ec.Dir),
)
}
var stopped <-chan struct{}
var errc <-chan error
// 判断数据目录类型并启动
which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)
if which != dirEmpty {
lg.Info(
"server has already been initialized",
zap.String("data-dir", cfg.ec.Dir),
zap.String("dir-type", string(which)),
)
switch which {
// 启动etcd
case dirMember:
stopped, errc, err = startEtcd(&cfg.ec) // 启动 ETCD 服务
case dirProxy:
lg.Panic("v2 http proxy has already been deprecated in 3.6", zap.String("dir-type", string(which)))
default:
lg.Panic(
"unknown directory type",
zap.String("dir-type", string(which)),
)
}
} else {
lg.Info(
"Initialize and start etcd server",
zap.String("data-dir", cfg.ec.Dir),
zap.String("dir-type", string(which)),
)
stopped, errc, err = startEtcd(&cfg.ec)
}
// 处理启动错误
if err != nil {
// ... 省略部分代码
lg.Fatal("discovery failed", zap.Error(err))
}
// 信号处理与退出
osutil.HandleInterrupts(lg)
// At this point, the initialization of etcd is done.
// The listeners are listening on the TCP ports and ready
// for accepting connections. The etcd instance should be
// joined with the cluster and ready to serve incoming
// connections.
notifySystemd(lg)
select {
case lerr := <-errc:
//2 fatal out on listener errors
lg.Fatal("listener failed", zap.Error(lerr))
case <-stopped:
}
osutil.Exit(0)
}
3.1. 关键调用
startEtcd(&cfg.ec)
:这是实际启动 ETCD 服务的函数,负责初始化和启动所有必要组件。
identifyDataDirOrDie()
:检查数据目录类型,决定是启动服务还是退出。
4、startEtcd(&cfg.ec)详解
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
// 是启动 ETCD 服务的核心函数。它负责初始化 ETCD 实例、注册中断信号处理,
// 并等待服务成功启动或异常停止。
// 最后,返回相关的停止和错误通道
e, err := embed.StartEtcd(cfg)
// 启动失败,立即返回错误
if err != nil {
return nil, nil, err
}
// 注册中断信号处理
osutil.RegisterInterruptHandler(e.Close)
// 等待服务状态
select {
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
}
return e.Server.StopNotify(), e.Err(), nil
}
4.1 embed.StartEtcd(cfg)
根据传入的配置 cfg
,启动一个嵌入式 ETCD 实例。
返回值 e
:embed.Etcd
实例,封装了 ETCD 服务的所有组件,包括 Raft 节点、存储引擎、gRPC 和 HTTP 接口。
错误处理:如果启动失败,立即返回错误。
4.2 RegisterInterruptHandler
- 作用:监听系统中断信号(如
SIGINT
、SIGTERM
),确保在接收到中断信号时,调用e.Close()
优雅地关闭 ETCD 服务。 e.Close
:关闭所有资源(网络连接、存储、Raft 等)。
5、embed.StartEtcd(cfg)详解
// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
// The returned Etcd.Server is not guaranteed to have joined the cluster. Wait
// on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use.
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// 1.配置验证
if err = inCfg.Validate(); err != nil {
return nil, err
}
serving := false
// 初始化 ETCD 实例
e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
cfg := &e.cfg
// 如果服务未成功启动,则关闭所有资源和监听器
defer func() {
if e == nil || err == nil {
return
}
if !serving {
// errored before starting gRPC server for serveCtx.serversC
for _, sctx := range e.sctxs {
close(sctx.serversC)
}
}
e.Close()
e = nil
}()
if !cfg.SocketOpts.Empty() {
cfg.logger.Info(
"configuring socket options",
zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress),
zap.Bool("reuse-port", cfg.SocketOpts.ReusePort),
)
}
e.cfg.logger.Info(
"configuring peer listeners",
zap.Strings("listen-peer-urls", e.cfg.getListenPeerURLs()),
)
// 配置 Peer(节点间通信)监听器
if e.Peers, err = configurePeerListeners(cfg); err != nil {
return e, err
}
e.cfg.logger.Info(
"configuring client listeners",
zap.Strings("listen-client-urls", e.cfg.getListenClientURLs()),
)
// 配置 Client(客户端通信)监听器
if e.sctxs, err = configureClientListeners(cfg); err != nil {
return e, err
}
for _, sctx := range e.sctxs {
e.Clients = append(e.Clients, sctx.l)
}
var (
urlsmap types.URLsMap
token string
)
// 初始化集群信息
memberInitialized := true
if !isMemberInitialized(cfg) {
memberInitialized = false
urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
if err != nil {
return e, fmt.Errorf("error setting up initial cluster: %w", err)
}
}
// AutoCompactionRetention defaults to "0" if not set.
if len(cfg.AutoCompactionRetention) == 0 {
cfg.AutoCompactionRetention = "0"
}
// 设置存储和压缩选项
autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)
if err != nil {
return e, err
}
backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
srvcfg := config.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.AdvertiseClientUrls,
PeerURLs: cfg.AdvertisePeerUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapshotCount: cfg.SnapshotCount,
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
DiscoveryCfg: cfg.DiscoveryCfg,
NewCluster: cfg.IsNewCluster(),
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
BcryptCost: cfg.BcryptCost,
TokenTTL: cfg.AuthTokenTTL,
CORS: cfg.CORS,
HostWhitelist: cfg.HostWhitelist,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled,
CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
WarningUnaryRequestDuration: cfg.WarningUnaryRequestDuration,
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
ExperimentalMaxLearners: cfg.ExperimentalMaxLearners,
V2Deprecation: cfg.V2DeprecationEffective(),
ExperimentalLocalAddress: cfg.InferLocalAddr(),
ServerFeatureGate: cfg.ServerFeatureGate,
}
if srvcfg.ExperimentalEnableDistributedTracing {
tctx := context.Background()
tracingExporter, terr := newTracingExporter(tctx, cfg)
if terr != nil {
return e, terr
}
e.tracingExporterShutdown = func() {
tracingExporter.Close(tctx)
}
srvcfg.ExperimentalTracerOptions = tracingExporter.opts
e.cfg.logger.Info(
"distributed tracing setup enabled",
)
}
srvcfg.PeerTLSInfo.LocalAddr = srvcfg.ExperimentalLocalAddress
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
// 创建服务实例
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return e, err
}
// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
// newly started member ("memberInitialized==false")
// does not need corruption check
if memberInitialized && srvcfg.ServerFeatureGate.Enabled(features.InitialCorruptCheck) {
if err = e.Server.CorruptionChecker().InitialCheck(); err != nil {
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
// (nothing to close since rafthttp transports have not been started)
e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err))
e.Server.Cleanup()
e.Server = nil
return e, err
}
}
// 启动服务
e.Server.Start()
e.servePeers()
e.serveClients()
// 用于暴露 ETCD 的 监控指标
if err = e.serveMetrics(); err != nil {
return e, err
}
e.cfg.logger.Info(
"now serving peer/client/metrics",
zap.String("local-member-id", e.Server.MemberID().String()),
zap.Strings("initial-advertise-peer-urls", e.cfg.getAdvertisePeerURLs()),
zap.Strings("listen-peer-urls", e.cfg.getListenPeerURLs()),
zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientURLs()),
zap.Strings("listen-client-urls", e.cfg.getListenClientURLs()),
zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()),
)
serving = true
return e, nil
}
5.1. 配置验证
验证配置是否合法。例如,检查必填项是否缺失
5.2. 初始化 ETCD 实例
创建一个新的 ETCD 实例,并初始化用于控制服务停止的通道 stopc
。
5.3.延迟清理(defer
块)
在函数退出时,确保资源被正确释放。如果服务未成功启动,则关闭所有资源和监听器。
5.4. 配置网络监听器
配置 Peer(节点间通信)监听器
为节点间通信设置监听器,方便集群内部通过 Raft 协议进行通信。
配置 Client(客户端通信)监听器:
为客户端请求设置监听器,处理外部对 ETCD 的访问。
5.5.初始化集群信息
如果当前节点是新成员,则生成初始集群信息,包括节点 URL 和集群令牌。
5.6.设置存储和压缩选项:
设置数据压缩和存储的保留策略,以控制存储空间占用。
5.7.创建服务实例
基于配置创建 Raft 节点和存储服务。
5.8.启动服务
e.Server.Start()
e.servePeers()
e.serveClients()
启动 Raft 节点,监听节点间和客户端的请求。
5.9 记录启动完成日志并返回结果
- 记录服务启动成功的信息,包括节点 ID 和监听的 URL。
- 返回已启动的 ETCD 实例