本篇分析 配置中心模块 启动流程:
先看启动配置参数:
 
 进入方法:
 
 先看看配置中心服务数据模型:初始化也是围绕着下面各个属性赋值...
// Server 配置中心核心服务
type Server struct {
	cfg *Config
	storage           store.Store
	fileCache         cachetypes.ConfigFileCache
	groupCache        cachetypes.ConfigGroupCache
	grayCache         cachetypes.GrayCache
	caches            cachetypes.CacheManager
	watchCenter       *watchCenter
	namespaceOperator namespace.NamespaceOperateServer
	initialized       bool
	history       plugin.History
	cryptoManager plugin.CryptoManager
	hooks         []ResourceHook
	// chains
	chains *ConfigChains
	sequence int64
}
初始化有幂等性,保证初始化一次;
 和服务发现模块一样,也有服务代理proxySvr,也保存了原始的服务originServer实例对象:
// Initialize 初始化配置中心模块
func Initialize(ctx context.Context, config Config, s store.Store, cacheMgr cachetypes.CacheManager,
	namespaceOperator namespace.NamespaceOperateServer) error {
	if originServer.initialized {
		return nil
	}
	proxySvr, originSvr, err := doInitialize(ctx, config, s, cacheMgr, namespaceOperator)
	if err != nil {
		return err
	}
	originServer = originSvr
	server = proxySvr
	return nil
}
所以我们去看看是如何doInitialize(...)的:
 
再看上图中的第118行:originSvr.initialize(...): 点进去:
 
 
1.看看watchCenter结构体:

type SubscribtionContext struct {
	subID  string
	cancel context.CancelFunc
}
WatchContext interface {
	// ClientID .
	ClientID() string
	// ClientLabels .
	ClientLabels() map[string]string
	// AppendInterest .
	AppendInterest(item *apiconfig.ClientConfigFileInfo)
	// RemoveInterest .
	RemoveInterest(item *apiconfig.ClientConfigFileInfo)
	// ShouldNotify .
	ShouldNotify(event *model.SimpleConfigFileRelease) bool
	// Reply .
	Reply(rsp *apiconfig.ConfigClientResponse)
	// Close .
	Close() error
	// ShouldExpire .
	ShouldExpire(now time.Time) bool
	// ListWatchFiles
	ListWatchFiles() []*apiconfig.ClientConfigFileInfo
	// IsOnce
	IsOnce() bool
}
WatchContext接口有以下3个具体实现:long poll 它,出现了……
 
 我们先只关注第2项:polaris下的watch.go(不考虑nacos)
type LongPollWatchContext struct {
	clientId         string
	labels           map[string]string
	once             sync.Once
	finishTime       time.Time
	finishChan       chan *apiconfig.ConfigClientResponse
	watchConfigFiles map[string]*apiconfig.ClientConfigFileInfo
	betaMatcher      BetaReleaseMatcher
}
2.再看 eventhub.Subscribe(eventhub.ConfigFilePublishTopic, wc, eventhub.WithQueueSize(QueueSize)):
 方法本身在上篇已经介绍:发布订阅模式,再贴出流程图:
 
 这里我们先看看入参的值:见名知意:
 
 第3个参数:队列长度:10240,还有那30秒,正是我们在Polaris系列-00.前言 篇中谈到“入坑”的缘由之一……
 
 最后看第2个参数:wc,它实现了Handler接口:
 
 
 所以依据上述流程,我们先看watchcenter监听到文件内容改变后是如何处理的:
 先看消息数据模型:event.Message
 
type ConfigFileReleaseKey struct {
	Id          uint64
	Name        string
	Namespace   string
	Group       string
	FileName    string
	ReleaseType ReleaseType
}
type ReleaseType string
const (
	// ReleaseTypeFull 全量类型
	ReleaseTypeFull = ""
	// ReleaseTypeGray 灰度类型
	ReleaseTypeGray = "gray"
)
清楚了数据结构之后,我们再看实现:wc.notifyToWatchers(event.Message)
 FYI: 省略了例外判断逻辑:
func (wc *watchCenter) notifyToWatchers(publishConfigFile *model.SimpleConfigFileRelease) {
	// namespace + group + filename 拼接起来
	watchFileId := utils.GenFileId(publishConfigFile.Namespace, publishConfigFile.Group, publishConfigFile.FileName)
	// watchers是一个map: fileId -> []clientId
	clientIds, _ := wc.watchers.Load(watchFileId)
	// 包装通知客户端入参模型:见下面解释说明1
	changeNotifyRequest := publishConfigFile.ToSpecNotifyClientRequest()
	// 配置变更客户端响应对象
	response := api.NewConfigClientResponse(apimodel.Code_ExecuteSuccess, changeNotifyRequest)
	notifyCnt := 0
	// 获取clientIds此刻的一个数据快照,再遍历之:
	clientIds.Range(func(clientId string) {
		// clientId -> watchContext
		watchCtx, _ := wc.clients.Load(clientId)
		// 是否要通知:见下面解释2
		if watchCtx.ShouldNotify(publishConfigFile) {
			// 解释3
			watchCtx.Reply(response)
			notifyCnt++
			// 只能用一次,通知完就要立马清理掉这个 WatchContext
			// 只有StreamWatchContext的IsOnce()实现才是false
			if watchCtx.IsOnce() {
				// 疑问4
				wc.RemoveAllWatcher(watchCtx.ClientID())
			}
		}
	})
	// 日志输出
}
解释说明1:
 
 解释说明2: 后面新版本console中配置文件变更时会弹出框,让我们填写版本,防止我们无脑地随便更新……
 
func (c ConfigFileReleaseKey) FileKey() string {
	return fmt.Sprintf("%v@%v@%v", c.Namespace, c.Group, c.FileName)
}
解释3: 往finishChan chan中投递数据,看来早有人在守着此chan,等待读取里面数据:
func (c *LongPollWatchContext) Reply(rsp *apiconfig.ConfigClientResponse) {
	c.once.Do(func() {
		c.finishChan <- rsp
		close(c.finishChan)
	})
}
我们看看谁调用下面这2个方法(之一):
 
 果然有客户端(而且只有一处调用)在监听等待结果。我们先不在这里去深入研究,只是确定一件事情:有人在监听此chan,获取配置变更最新结果。而上图另一个GetNotifieResultWithTime(...)只出现在client_test.go文件中,长舒一口气…
 
 疑问, 从上述代码中:wc.RemoveAllWatcher(watchCtx.ClientID())
 用了一次watchCtx后就删除了,这……那后续文件再变更怎么办?我们接着看…
3.go wc.startHandleTimeoutRequestWorker(ctx):每秒检测,移除过期的订阅者:
 
func (c *LongPollWatchContext) ShouldExpire(now time.Time) bool {
	return now.After(c.finishTime)
}
type LongPollWatchContext struct {
	clientId         string
	labels           map[string]string
	once             sync.Once
	finishTime       time.Time
	finishChan       chan *apiconfig.ConfigClientResponse
	watchConfigFiles map[string]*apiconfig.ClientConfigFileInfo
	betaMatcher      BetaReleaseMatcher
}
看来我们丢失了一些环节,比如s.clients 是什么时候/如何 关联上值的?
 还有上面提出的那个疑问:后续再有文件变更又该如何,毕竟你都移除了watchCtx…
我们先把整体梳理完,再尝试去找答案…
初始化配置中心核心服务 Server数据模型后,往下:
 
 
 进入第一个代理:上上图的第132行:
 
 
 第二层代理:
 
 
 至此,配置中心初始化大概流程介绍完毕。下一篇从下面蓝行开始:
 
我们回头看看上面留的两个问题:
- s.clients是什么时候/如何 关联上值的?
- 后续再有文件变更又该如何,毕竟你都移除了watchCtx…
先看2: 既然有删除,那肯定有添加:
 
 查找调用点:nacos我们不看,只看第2个红框:
 
 
 此方法实现的接口:
 
 我们看看上上图的第113行:
 1.先看第3个参数:默认30秒超时,正是对应hold住请求30秒:
 
 进入AddWatcher(...):
 
 上面的超时30秒 结合上方中的 3.go wc.startHandleTimeoutRequestWorker(ctx):每秒检测,移除过期的订阅者方法 正是 官方wiki文档 Polaris系列00篇说到的 默认hold住请求30秒…
我们本着尝试解答疑问2,没想到把疑问1也解答了。。。
最后我们再看看谁调用了LongPullWatchFile(...): 两种实现: http + grpc:
 
 
 最后,我还是有些说服不了自己,文件变更后,移除了watchCtx, 那是不是客户端就主动再发起监听呢,这样咱们Polaris- server服务器才能在配置文件变更时再次推送给客户端?只能理解成这就是一个http/grpc请求,既然在这一次请求中收到配置文件内容变更,那就算是完成了本次监听的使命,后续要监听的话,客户端只能主动再发起请求监听。如果知情者看到此疑问,请不吝解惑,谢谢。
今天周六,加班的感觉。。。有点顶… 状态不佳,最后祝大家周末愉快…



















