本篇分析 配置中心模块 启动流程:
先看启动配置参数:
进入方法:
先看看配置中心服务数据模型:初始化也是围绕着下面各个属性赋值...
// Server 配置中心核心服务
type Server struct {
cfg *Config
storage store.Store
fileCache cachetypes.ConfigFileCache
groupCache cachetypes.ConfigGroupCache
grayCache cachetypes.GrayCache
caches cachetypes.CacheManager
watchCenter *watchCenter
namespaceOperator namespace.NamespaceOperateServer
initialized bool
history plugin.History
cryptoManager plugin.CryptoManager
hooks []ResourceHook
// chains
chains *ConfigChains
sequence int64
}
初始化有幂等性,保证初始化一次;
和服务发现模块一样,也有服务代理proxySvr
,也保存了原始的服务originServer
实例对象:
// Initialize 初始化配置中心模块
func Initialize(ctx context.Context, config Config, s store.Store, cacheMgr cachetypes.CacheManager,
namespaceOperator namespace.NamespaceOperateServer) error {
if originServer.initialized {
return nil
}
proxySvr, originSvr, err := doInitialize(ctx, config, s, cacheMgr, namespaceOperator)
if err != nil {
return err
}
originServer = originSvr
server = proxySvr
return nil
}
所以我们去看看是如何doInitialize(...)
的:
再看上图中的第118行:originSvr.initialize(...)
: 点进去:
1.看看watchCenter
结构体:
type SubscribtionContext struct {
subID string
cancel context.CancelFunc
}
WatchContext interface {
// ClientID .
ClientID() string
// ClientLabels .
ClientLabels() map[string]string
// AppendInterest .
AppendInterest(item *apiconfig.ClientConfigFileInfo)
// RemoveInterest .
RemoveInterest(item *apiconfig.ClientConfigFileInfo)
// ShouldNotify .
ShouldNotify(event *model.SimpleConfigFileRelease) bool
// Reply .
Reply(rsp *apiconfig.ConfigClientResponse)
// Close .
Close() error
// ShouldExpire .
ShouldExpire(now time.Time) bool
// ListWatchFiles
ListWatchFiles() []*apiconfig.ClientConfigFileInfo
// IsOnce
IsOnce() bool
}
WatchContext
接口有以下3个具体实现:long poll
它,出现了……
我们先只关注第2项:polaris下的watch.go(不考虑nacos)
type LongPollWatchContext struct {
clientId string
labels map[string]string
once sync.Once
finishTime time.Time
finishChan chan *apiconfig.ConfigClientResponse
watchConfigFiles map[string]*apiconfig.ClientConfigFileInfo
betaMatcher BetaReleaseMatcher
}
2.再看 eventhub.Subscribe(eventhub.ConfigFilePublishTopic, wc, eventhub.WithQueueSize(QueueSize))
:
方法本身在上篇已经介绍:发布订阅模式,再贴出流程图:
这里我们先看看入参的值:见名知意:
第3个参数:队列长度:10240,还有那30秒,正是我们在Polaris系列-00.前言
篇中谈到“入坑”的缘由之一……
最后看第2个参数:wc
,它实现了Handler
接口:
所以依据上述流程,我们先看watchcenter
监听到文件内容改变后是如何处理的:
先看消息数据模型:event.Message
type ConfigFileReleaseKey struct {
Id uint64
Name string
Namespace string
Group string
FileName string
ReleaseType ReleaseType
}
type ReleaseType string
const (
// ReleaseTypeFull 全量类型
ReleaseTypeFull = ""
// ReleaseTypeGray 灰度类型
ReleaseTypeGray = "gray"
)
清楚了数据结构之后,我们再看实现:wc.notifyToWatchers(event.Message)
FYI: 省略了例外判断逻辑:
func (wc *watchCenter) notifyToWatchers(publishConfigFile *model.SimpleConfigFileRelease) {
// namespace + group + filename 拼接起来
watchFileId := utils.GenFileId(publishConfigFile.Namespace, publishConfigFile.Group, publishConfigFile.FileName)
// watchers是一个map: fileId -> []clientId
clientIds, _ := wc.watchers.Load(watchFileId)
// 包装通知客户端入参模型:见下面解释说明1
changeNotifyRequest := publishConfigFile.ToSpecNotifyClientRequest()
// 配置变更客户端响应对象
response := api.NewConfigClientResponse(apimodel.Code_ExecuteSuccess, changeNotifyRequest)
notifyCnt := 0
// 获取clientIds此刻的一个数据快照,再遍历之:
clientIds.Range(func(clientId string) {
// clientId -> watchContext
watchCtx, _ := wc.clients.Load(clientId)
// 是否要通知:见下面解释2
if watchCtx.ShouldNotify(publishConfigFile) {
// 解释3
watchCtx.Reply(response)
notifyCnt++
// 只能用一次,通知完就要立马清理掉这个 WatchContext
// 只有StreamWatchContext的IsOnce()实现才是false
if watchCtx.IsOnce() {
// 疑问4
wc.RemoveAllWatcher(watchCtx.ClientID())
}
}
})
// 日志输出
}
解释说明1:
解释说明2: 后面新版本console中配置文件变更时会弹出框,让我们填写版本,防止我们无脑地随便更新……
func (c ConfigFileReleaseKey) FileKey() string {
return fmt.Sprintf("%v@%v@%v", c.Namespace, c.Group, c.FileName)
}
解释3: 往finishChan
chan中投递数据,看来早有人在守着此chan,等待读取里面数据:
func (c *LongPollWatchContext) Reply(rsp *apiconfig.ConfigClientResponse) {
c.once.Do(func() {
c.finishChan <- rsp
close(c.finishChan)
})
}
我们看看谁调用下面这2个方法(之一):
果然有客户端(而且只有一处调用)
在监听等待结果。我们先不在这里去深入研究,只是确定一件事情:有人在监听此chan,获取配置变更最新结果。而上图另一个GetNotifieResultWithTime(...)
只出现在client_test.go
文件中,长舒一口气…
疑问, 从上述代码中:wc.RemoveAllWatcher(watchCtx.ClientID())
用了一次watchCtx
后就删除了,这……那后续文件再变更怎么办?我们接着看…
3.go wc.startHandleTimeoutRequestWorker(ctx)
:每秒检测,移除过期的订阅者:
func (c *LongPollWatchContext) ShouldExpire(now time.Time) bool {
return now.After(c.finishTime)
}
type LongPollWatchContext struct {
clientId string
labels map[string]string
once sync.Once
finishTime time.Time
finishChan chan *apiconfig.ConfigClientResponse
watchConfigFiles map[string]*apiconfig.ClientConfigFileInfo
betaMatcher BetaReleaseMatcher
}
看来我们丢失了一些环节,比如s.clients
是什么时候/如何 关联上值的?
还有上面提出的那个疑问:后续再有文件变更又该如何,毕竟你都移除了watchCtx
…
我们先把整体梳理完,再尝试去找答案…
初始化配置中心核心服务 Server数据模型后,往下:
进入第一个代理:上上图的第132行:
第二层代理:
至此,配置中心初始化大概流程介绍完毕。下一篇从下面蓝行开始:
我们回头看看上面留的两个问题:
s.clients
是什么时候/如何 关联上值的?- 后续再有文件变更又该如何,毕竟你都移除了
watchCtx
…
先看2: 既然有删除,那肯定有添加:
查找调用点:nacos我们不看,只看第2个红框:
此方法实现的接口:
我们看看上上图的第113行:
1.先看第3个参数:默认30秒超时,正是对应hold住请求30秒:
进入AddWatcher(...)
:
上面的超时30秒
结合上方中的 3.go wc.startHandleTimeoutRequestWorker(ctx)
:每秒检测,移除过期的订阅者方法 正是 官方wiki文档 Polaris系列00篇
说到的 默认hold住请求30秒…
我们本着尝试解答疑问2,没想到把疑问1也解答了。。。
最后我们再看看谁调用了LongPullWatchFile(...)
: 两种实现: http + grpc:
最后,我还是有些说服不了自己,文件变更后,移除了watchCtx
, 那是不是客户端就主动再发起监听呢,这样咱们Polaris- server服务器才能在配置文件变更时再次推送给客户端?只能理解成这就是一个http/grpc请求,既然在这一次请求中收到配置文件内容变更,那就算是完成了本次监听的使命,后续要监听的话,客户端只能主动再发起请求监听。如果知情者看到此疑问,请不吝解惑,谢谢。
今天周六,加班的感觉。。。有点顶… 状态不佳,最后祝大家周末愉快…