本文的背景使用的是 kratos 框架。
背景
众所周知 grpc 底层使用 http2 协议,而 http2 是一个长链接多路复用的。在正常情况下客服端与服务端一对一不会需要负载均衡手段;但是当服务上云之后为了保障服务的可用性所以我们服务端一般是多副本,这种情况下客户端通过服务端的 service 名称与服务端建立连接而 service 会将流量轮训到后端的某一个 pod 这样就会造成客户端只会与某一个 pod 建立连接并且所有的请求流量都会到一个 pod 上面,其他任意数量的 pod 都是摆设。
解决方法
在讲述解决办法之前先知道以下一些概念
- Headless Service
- GRPC Name Resolver
Headless Service
首先 headless svc 是 k8s 一种服务,但它不为自己分配 IP 地址。相反,会返回与 svc 关联的 pod 的 ip,允许客户端直连到单个 pod(客户端和 pod 之间直接通信)。
和正常/普通的 svc 的却别就是普通的 svc 会分配一个 vip 地址,然后流量会通过这个 vip 轮训到后端的一个 pod;headless svc 不会分配任何 ip 地址并且返回 headless svc 会返回 pods ip。
GRPC Name Resolver
grpc 一些基础概念
- Resolver:解析器,用于从注册中心实时获取当前服务端的列表,同步发送给 Balancer
- Balancer:平衡器,一是接收从 Resolver 发送的服务端列表,建立并维护(长)连接状态;二是每次当 client 发起 rpc 调用时,按照算法从连接池中选择一个连接进行 rpc 调用
- Register:注册,用于服务端初始化将自己信息上报到注册中心,主要信息有 ip、port
本文没有注册中心,使用 k8s api
简单的来说 Name Resolver(名称解析器)可以看作是一个 map[service-name][]backend-ip。它接收一个服务名称,并返回后端的 IP 列表。gRPC 中根据目标字符串中的 scheme 选择名称解析器。
解决方法的本质就是通过 headless svc + Name Resolver 自定一个解析器。
使用方法
通过 kuberesolver 程序实现;kuberesolver 是一个使用 k8s api 的 GRPC Name Resolver。
- 创建 headless svc
- 改造代码
apiVersion: v1
kind: Service
metadata:
name: headless
namespace: oasis-dev
spec:
ports:
- name: http
port: 8000
protocol: TCP
targetPort: 8000
- name: grpc
port: 9000
protocol: TCP
targetPort: 9000
selector:
app: oasis-dev-prizesinfrastructure
type: ClusterIP
改造源码
import "github.com/sercand/kuberesolver/v4"
kuberesolver.RegisterInCluster()
ep := fmt.Sprintf("kubernetes:///%s.%s:%d", service, namespace, port)
conn, err := transgrpc.DialInsecure(
context.Background(),
transgrpc.WithEndpoint(ep),
transgrpc.WithTimeout(60*time.Second),
transgrpc.WithMiddleware(
recovery.Recovery(),
tracing.Client(),
),
)
使用缺点
- 使用 kuberesolver 只能在集群内运行
- 使用 kuberesolver 部署需要一个 service account
原理
上文讲到 kuberesolver 内部自定义了一个名称解析器;其中 kubeBuilder 和 kResolver 结构体分别实现了 grpc 的 Builder 接口和 Resolver 接口。
注册解析器
// RegisterInCluster registers the kuberesolver builder to grpc with kubernetes schema
// kubernetesSchema = kubernetes
func RegisterInCluster() {
RegisterInClusterWithSchema(kubernetesSchema)
}
// RegisterInClusterWithSchema registers the kuberesolver builder to the grpc with custom schema
func RegisterInClusterWithSchema(schema string) {
resolver.Register(NewBuilder(nil, schema))
}
// NewBuilder creates a kubeBuilder which is used by grpc resolver.
func NewBuilder(client K8sClient, schema string) resolver.Builder {
return &kubeBuilder{
k8sClient: client,
schema: schema,
}
}
grpc.DialInsecure 或者 grpc.Dial 函数中会先根据 kubernetes 这个 scheme 找到我们通过 RegisterInCluster 函数注册的 kubeBuilder,然后调用它的 Build() 方法构建我们自定义的 kResolver。kResolver 的 watch 方法一直以协程的方式监听我们指定的 headless svc 对应的 endpoints(如果 pod 上面或者销毁会被程序立即感知到)。
func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if b.k8sClient == nil {
if cl, err := NewInClusterK8sClient(); err == nil {
b.k8sClient = cl
} else {
return nil, err
}
}
ti, err := parseResolverTarget(target)
if err != nil {
return nil, err
}
if ti.serviceNamespace == "" {
ti.serviceNamespace = getCurrentNamespaceOrDefault()
}
ctx, cancel := context.WithCancel(context.Background())
r := &kResolver{
target: ti,
ctx: ctx,
cancel: cancel,
cc: cc,
rn: make(chan struct{}, 1),
k8sClient: b.k8sClient,
t: time.NewTimer(defaultFreq),
freq: defaultFreq,
endpoints: endpointsForTarget.WithLabelValues(ti.String()),
addresses: addressesForTarget.WithLabelValues(ti.String()),
}
go until(func() {
r.wg.Add(1)
err := r.watch()
if err != nil && err != io.EOF {
grpclog.Errorf("kuberesolver: watching ended with error='%v', will reconnect again", err)
}
}, time.Second, ctx.Done())
return r, nil
}
watch 中一共有三个地方会触发 grpc address 的更新分别如下
- k.t.C:定时器触发 30 min 一次
- k.rn:ResolveNow 触发
- up, hasMore := <-sw.ResultChan():streamWatcher 有新消息(事件)通知时,调用 k.handle(up.Object) 处理事件,即监听的资源发生了改变(创建/销毁/更新)
func (k *kResolver) watch() error {
defer k.wg.Done()
// watch endpoints lists existing endpoints at start
sw, err := watchEndpoints(k.ctx, k.k8sClient, k.target.serviceNamespace, k.target.serviceName)
if err != nil {
return err
}
for {
select {
case <-k.ctx.Done():
return nil
case <-k.t.C:
k.resolve()
case <-k.rn:
k.resolve()
case up, hasMore := <-sw.ResultChan():
if hasMore {
k.handle(up.Object)
} else {
return nil
}
}
}
}
func (k *kResolver) ResolveNow(resolver.ResolveNowOptions) {
select {
case k.rn <- struct{}{}:
default:
}
}
以上三种途径最后都会通过 k.handle() 处理。
func (k *kResolver) resolve() {
e, err := getEndpoints(k.k8sClient, k.target.serviceNamespace, k.target.serviceName)
if err == nil {
k.handle(e)
} else {
grpclog.Errorf("kuberesolver: lookup endpoints failed: %v", err)
}
// Next lookup should happen after an interval defined by k.freq.
k.t.Reset(k.freq)
}
k.makeAddresses 会循环 endpoints 的 subsets 返回一个 []resolver.Address 用于解析器调用 NewAddress 来通知 ClientConn 一个新的解析地址列表(即服务列表更新通知接口)
func (k *kResolver) handle(e Endpoints) {
result, _ := k.makeAddresses(e)
// k.cc.NewServiceConfig(sc)
if len(result) > 0 {
k.cc.NewAddress(result)
}
k.endpoints.Set(float64(len(e.Subsets)))
k.addresses.Set(float64(len(result)))
}