Dubbogo 详解
简介
dubbo功能很强大的微服务开发框架,支持多种通信协议,并具有流量治理的功能。
dubbo在有了大转变,拥抱了云原生,从哪些方面可以体现呢?
- 推出了自己的
Trip
协议 - 修复了服务发现的级别,之前是方法级别,现在是服务级别
- 提供了服务发现,流量控制和XDS的结合
当然,dubbo还是那个dubbo,原来的配置都有,并且支持跨语言通信,并且方便我们开发,提供了cli的工具来使用,所以我们的通信变成了下面的样子
- 先写IDL
- 实现对应的服务,调用对应的接口。
dubbo有对应的示例代码:
https://github.com/apache/dubbo-go-samples
dubbo的架构和分层
我想引用官网上的图来清晰的说明,dubbo的抽象是很重要的。
dubbo中有几个重要的接口和对象
- invoker:表示一个可调用的抽象对象,不管是从loadBalance,还是从cluster,还是dictionary,都表示的是一个invoker。
- Invocation:调用的上下文。通过它可以调用此次调用时候的参数名字,参数类型,返回值等等一系列的数据
具体的可以参考官网:
https://cn.dubbo.apache.org/zh-cn/overview/mannual/java-sdk/reference-manual/architecture/code-architecture/
这是从dubbo的官网上copy的图,源码的分析按照这个图来看,很清晰,可能会有一点点的不一样,但这也没太大的问题。
分为服务端和客户端:
服务端:构建代理对象,接受服务端的请求。
客户端:构建代理对象,请求服务端。
这里需要处理几个问题
-
怎么在服务端和客户端调用的基础上,将dubbo这一套东西加上去
每个通信协议在dubbo中都有自己的通信协议的实现,当让他们也是一个invoker,它作为baseInvoker,在此基础上增加各种各样的invoker(cluster,loadbalance,filter等)。
对于客户端而言,也有baseInvoker,客户端而言,只有filter。
-
go中是没有动态代理的概念的,所以,对于客户端来说,怎么生成代理对象。
代理对象不能再运行的时候生成,那可以提早生成。
dubbo提供IDL的工具实现了此功能,提早已经生成了好了代理对象,用来做真正的调用。
并且生成了一个对应的ClientImpl,它是一个结构体,结构体中的属性是对应的方法,此时就可以利用GO的反射来动态的修改属性,这样在构建的时候就做到了。
服务端创建一个exporter开始
code:
type GreeterProvider struct {
api.UnimplementedGreeterServer
}
func (s *GreeterProvider) SayHello(ctx context.Context, in *api.HelloRequest) (*api.User, error) {
logger.Infof("Dubbo3 GreeterProvider get user name = %s\n", in.Name)
return &api.User{Name: "Hello " + in.Name, Id: "12345", Age: 21}, nil
}
// export DUBBO_GO_CONFIG_PATH= PATH_TO_SAMPLES/direct/go-server/conf/dubbogo.yml
func main() {
config.SetProviderService(&GreeterProvider{})
if err := config.Load(); err != nil {
panic(err)
}
select {}
}
将对应的实现类提供给dubbo,并且启动。
-
注册到dubbo中
在整个dubbo没有启动之前,这里的注册只是放在一个map中,需要保证key是唯一。
-
启动
启动分为五步
- 初始化config
- 初始化provider
- 初始化consumer
- 初始化shutdown
初始化好此provider的配置,配置叫做ServiceConfig
调用Export
来做真正的服务创建操作。
会通过具体的通信协议创建对应的invoker,并且结合filter。并且将服务注册到注册中心。
总体流程是这样,现在回答几个问题:
- dubbo中是怎么调用到真正的provider,也就是说dubbo是怎么和provider关联的。
- Invoker(调用者)是怎么构建的?
问题一:
dubbo是通过反射来调用的。
原理如下:
dubbo的trip协议是grpc的基础上改进的,所以要从这一点入手,从grpc我们知道,在启动的时候要注册相应的服务,在dubbo这里,从生产的pb文件中入手
会将此服务描述信息告诉grpc,当调用来了之后,就会调用到对应的方法中。这里我们以_Greeter_SayHello_Handler
为例,在此方法中,会构建invocation
,然后在调用dubbo构建的invoker
对象,就完成连接了。
代理对象是什么?
代理对象就是dubbo生成的invoker,在构建好invoker之后,会将它作为属性放在provider中,之后在客户端调用的时候可以通过反射来调用对应的方法。
dubbo为grpc的服务规定了接口。通过此接口就可以获取代理对象,获取服务的描述性息
并且官网提供了可拓展点和完整的调用说明:
https://cn.dubbo.apache.org/zh-cn/overview/mannual/java-sdk/reference-manual/architecture/service-invocation/
在什么时候注册grpc服务,并且设置代理对象的?
在构建好invoker对象之后,会调用XXX_SetProxyImpl
方法,设置值。
问题 二:
invoker是dubbo中很重要的概念,表示的是一个可调用者,通过这个概念,dubbo高度抽象了服务,
官网:
https://cn.dubbo.apache.org/zh-cn/overview/mannual/java-sdk/advanced-features-and-usage/service/fault-tolerent-strategy/
invoker表示可用的服务,不管是loadbalance、router、director等,都是一个可用的invoker。
在抽象的invoker都有基础的实现,剩下的filter,cluster都是基于此继续操作的。我们来看invoker的构建过程
在准备好服务方的配置之后,就开始挨个启动服务了,每个服务的定义信息叫做ServiceConfig
,调用Export
来初始化此服务,并且注册到注册中心。
我们知道dubbo中服务的定义信息传递全都靠url,在Export
中,主要有下面的几个步骤
- 获取注册中心,创建代理工厂(用于代理对象的创建)
- 构建url
- 通过不同的通信协议构建不同的Invoker,并且启动底层具体通信协议对应的服务器。
- 服务发布到注册中心
这里我们只说Invoker的构建。对应到代理里面如下:
invoker := proxyFactory.GetInvoker(ivkURL) // 通过代理工厂创建对应的代理类
exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker) // 从extension中获取对应的底层通信协议来构建invoker
先看代理工厂如何创建Invoker
基础的调用就是ProxyInvoker
,等请求来了之后,通过一系列filter,会走到这里,通过反射来调用具体的方法。
不同的通信协议如何构建invoker
extension
是一个拓展点,也是通过它逐渐的让dubbo丰满起来。
filter对应的wrapper是ProtocolFilterWrapper
,通过它基于ProxyInvoker
来构建最终的Invoker,最后通过最终的通信协议的来构建最终的Invoker,从而启动服务器,注册服务。
在dubbo的protocol包下面列举出了支持的通信协议
我们来查看dubbo3种trip的通信协议
上面已经介绍过了,主要是确定序列化类型,设置invoker代理。启动服务器
还有一个问题
最终是怎么通过反射调用的,并且何时设置的反射呢
直接去ProxyInvoker
中查看invoke
方法
这里对反射要说明一下,反射调用的时候需要确定参数的个数,结合实际,参数个数就一个或者两个,其中必须用context,剩下的要么是无参,要么就一个request对象。
查看serviceMap.Register
方法可知,在注册的时候会解析此结构体中的方法,并且设置封装为Service
对象
客户端创建一个Ref开始
客户端的构建思路和上面的差不多,不过多了注册中心,cluster的构建。
还是从几个问题开始
- 代理对象的创建和关联
- invoker对象的构建流程
整个流程从ReferenceConfig.Refer
方法开始
代理对象的创建
按照dubbo的逻辑,对象的创建必须去proxyFactory
中,但这里是整个filter,cluster构建好了之后,在基于此构建代理对象,因为这里的代理对象是最终给用户侧来调用的,所以,肯定是放在最后一步的。
最终的入口在DefaultProxyFactory.GetProxy
中,此方法中会创建Proxy
对象
type Proxy struct {
rpc common.RPCService // 代理还要保留origin
invoke protocol.Invoker // 生成的invoke对象,这是代表整个dubbo服务的invoke,此invoke中包含了 cluster,load等等流量治理的功能
callback interface{}
attachments map[string]string
implement ImplementFunc // 动态函数,这是一个函数类型,会通过此函数来实现普通服务和dubbo服务的对接
once sync.Once
}
这里只是设置了proxy,并没有做方法绑定操作,真正的是在ReferenceConfig.Implement
中实现的。他会调用到刚刚创建的proxy对象中的implement方法,从而给客户端动态绑定属性,达到调用dubbo服务的功能。这是重点,我们源码分析一下
源码:proxy.DefaultProxyImplementFunc
func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
// 开始反射操作
valueOf := reflect.ValueOf(v)
valueOfElem := valueOf.Elem()
// 这是重点,最终会将客户端中的属性方法,都设置为此函数,然后在此函数里面做调用。
// 当我们调用方法的时候,会调用到这里来,
makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {
return func(in []reflect.Value) []reflect.Value {
var (
err error
inv *invocation_impl.RPCInvocation
inIArr []interface{}
inVArr []reflect.Value
reply reflect.Value
replyEmptyFlag bool
)
if methodName == "Echo" {
methodName = "$echo"
}
// 确定参数和返回值等信息
if len(outs) == 2 { // return (reply, error)
if outs[0].Kind() == reflect.Ptr {
reply = reflect.New(outs[0].Elem())
} else {
reply = reflect.New(outs[0])
}
} else { // only return error
replyEmptyFlag = true
}
start := 0
end := len(in)
invCtx := context.Background()
// retrieve the context from the first argument if existed
if end > 0 {
if in[0].Type().String() == "context.Context" {
if !in[0].IsNil() {
// the user declared context as method's parameter
invCtx = in[0].Interface().(context.Context)
}
start += 1
}
}
if end-start <= 0 {
inIArr = []interface{}{}
inVArr = []reflect.Value{}
} else if v, ok := in[start].Interface().([]interface{}); ok && end-start == 1 {
inIArr = v
inVArr = []reflect.Value{in[start]}
} else {
inIArr = make([]interface{}, end-start)
inVArr = make([]reflect.Value, end-start)
index := 0
for i := start; i < end; i++ {
inIArr[index] = in[i].Interface()
inVArr[index] = in[i]
index++
}
}
inv = invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName),
invocation_impl.WithArguments(inIArr),
invocation_impl.WithCallBack(p.callback), invocation_impl.WithParameterValues(inVArr))
if !replyEmptyFlag {
inv.SetReply(reply.Interface()) // 这里的reply就是response
}
for k, value := range p.attachments {
inv.SetAttachment(k, value)
}
// add user setAttachment. It is compatibility with previous versions.
atm := invCtx.Value(constant.AttachmentKey)
if m, ok := atm.(map[string]string); ok {
for k, value := range m {
inv.SetAttachment(k, value)
}
} else if m2, ok2 := atm.(map[string]interface{}); ok2 {
// it is support to transfer map[string]interface{}. It refers to dubbo-java 2.7.
for k, value := range m2 {
inv.SetAttachment(k, value)
}
}
// 调用服务,这里用的是闭包
result := p.invoke.Invoke(invCtx, inv)
err = result.Error()
// cause is raw user level error
cause := perrors.Cause(err)
if err != nil {
// if some error happened, it should be log some info in the separate file.
if throwabler, ok := cause.(java_exception.Throwabler); ok {
logger.Warnf("[CallProxy] invoke service throw exception: %v , stackTraceElements: %v", cause.Error(), throwabler.GetStackTrace())
} else {
// entire error is only for printing, do not return, because user would not want to deal with massive framework-level error message
logger.Warnf("[CallProxy] received rpc err: %v", err)
}
} else {
logger.Debugf("[CallProxy] received rpc result successfully: %s", result)
}
if len(outs) == 1 {
return []reflect.Value{reflect.ValueOf(&cause).Elem()}
}
if len(outs) == 2 && outs[0].Kind() != reflect.Ptr {
return []reflect.Value{reply.Elem(), reflect.ValueOf(&cause).Elem()}
}
return []reflect.Value{reply, reflect.ValueOf(&cause).Elem()}
}
}
if err := refectAndMakeObjectFunc(valueOfElem, makeDubboCallProxy); err != nil {
logger.Errorf("The type or combination type of RPCService %T must be a pointer of a struct. error is %s", v, err)
return
}
}
invoker的构建是什么样子?
对应的代码在ReferenceConfig.Refer
,
- 构建url
- 通过不同的通信协议构建不同的base invoker,对于trip来说就是
DubboInvoker
- 基于此Invoker,增加filter功能
- 增加cluster
- 构建代理对象
DubboInvoker是什么样子
cluster
dubbo提供的集群容错的策略,cluster中的交互逻辑如下:
可以看到,整体的流程是获取所有的可用的invoker,然后通过路由,负载均衡,选择出一个invoker来做调用。
官网链接:
https://cn.dubbo.apache.org/zh-cn/overview/mannual/java-sdk/advanced-features-and-usage/service/fault-tolerent-strategy/
源码:https://github.com/apache/dubbo-go/tree/main/cluster/cluster
支持的策略如下:
理解了上面的逻辑,这里的代码比较好理解。
failover
快速切换,规定了重试次数,超过之后就失败。
failfast
只调用一次。失败就失败了
failsafe
会打印错误日志,并且返回一个空对象回去
forking
并行调用(默认为2),主要有一个成功,就返回,规定超时时间(1s)就返回失败
broadcast
广播调用,等待所有的调用结束之后才返回
available
zoneaware
统一地区的优先访问,其实就是做load balance 之前增加了一层筛选
adaptivesvc
自适应的cluster,需要搭配p2c
load balance 使用,这个比较复杂,需要搭配filter使用。之后出文章详解。
failback
失败恢复,将失败的请求放在队列里面,重试,但没有返回值
这里需要一个机制,重试机制的实现
在Java中是放在时间轮里面实现的,go用ticker
实现定时触发,一秒一次。
默认重试3次,重试队列长度为100
重试队列
处理过程
当失败之后,就会将调用放在一个定时任务的队列里面,每隔一秒就会将所有的任务执行一次(任务的间隔必须大于5秒),如果失败之后,通过条件决定是否要放在队列中。
loadBalance
官网:https://cn.dubbo.apache.org/zh-cn/overview/core-features/load-balance/
源码:https://github.com/apache/dubbo-go/tree/main/cluster/loadbalance
负载均衡从一堆可用的invoker中选一个出来。
consistenthashing
一致性hash,确定的入参,确定的提供者,适用于有状态请求。
但当invoker发生了变化,会重新计算hash分配。这里需要构建hash环,说到hash环就得有虚拟节点,虚拟节点是为了更好的分配平均,在这里每个物理节点会对应160个虚拟节点(默认)
用crc生成一个hashcode,从而生成hash环,每个方法对应一个hash环,并且hashcode发生了变化,就会重构hash环。下面的代码分为三步
hash环构建
每个物理节点有160个虚拟节点,并且用节点的ip+端口生成address,之后分为两次循环,外层40,内层4次,生成hashcode,并且保存虚拟节点和物理节点的映射关系。最后对生成的hash环做排序,方便之后的查找。
hash环查找
通过key生成hashcode,然后用找到第一个大于等于当前hash值的key,之后返回对应的物理节点
hash函数
好的函数很重要,可以让请求分配的更加的均匀。
这我看不懂了,看看GPT的回答吧
leastactive
最近最少活跃,需要配合filter继续当前节点的活跃情况,如果有多个相同情况的invoke。就通过权重来选择一个,如果还有相同的,就随机选择一个。
关于随机匹配,会产生一个总权重的随机值,占的比重大的为负数的概率低。
random
加权随机,按照权重做随机
ringhash
和服务网格相关,这里不做过多介绍
roundrobin
加权轮询,它的算法和Nginx平滑加权轮询算法,默认权重相同。
https://cn.dubbo.apache.org/zh-cn/overview/core-features/load-balance/#roundrobin
具体可看官网的解释,主要是为了在保证轮询的情况下,尽可能的分布均匀。
轮询算法需要保存上一次选择的节点的状态。
这里分为两步
选择节点
保存节点状态
p2c
自适应的算法。
这种算法需要结合filter来实现,之后在详细的说
dubbogo项目分层很清晰,没有像Java那样花里胡哨的代码,也没有线程池,时间轮等这些算法,好理解。
到这里就结束了。dubbo