【深入理解SpringCloud微服务】Spring-Cloud-OpenFeign源码解析(下)——LoadBalancerFeignClient详解
- RxJava简单介绍
- RxJava示例
- Observable与Subscriber
- 相关方法介绍
- Observable.create(OnSubscribe)
- Observable#just(T value)
- Observable#concatMap(Func1<? super T, ? extends Observable<? extends R>> func)
- Observable#toBlocking()
- Observable#single()
- Observer#onNext(T t)
- Observer#onError(Throwable e)
- Observer#onCompleted()
- LoadBalancerFeignClient源码解析
- LoadBalancerFeignClient#execute()
- AbstractLoadBalancerAwareClient#executeWithLoadBalancer()
- LoadBalancerCommand#submit()
- LoadBalancerCommand#selectServer()
- LoadBalancerContext#getServerFromLoadBalancer()
- FeignLoadBalancer#execute()
- Client.Default#execute()
RxJava简单介绍
由于LoadBalancerFeignClient里面使用到了RxJava ,因此我们要先了解一下RxJava ,RxJava 是一个在Java平台上实现响应式编程的库,用于处理异步数据流。
RxJava示例
public class RxJavaExample {
public static void main(String[] args) {
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("str1");
subscriber.onNext("str2");
subscriber.onNext("str3");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e);
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
});
}
}
这段代码首先调用Observable.create(…)方法创建了一个Observable对象,方法参数OnSubscribe定义了Observable被订阅时触发的动作:直接发射一组预定义的字符串。然后调用subscribe(…)方法订阅这个Observable,并通过方法参数Subscriber定义了接收到发射的数据、发生错误时和完成时的回调方法。
Observable与Subscriber
Observable代表可观察的对象,可以被多个观察者订阅并接收通知,提供了subscribe()方法,用于观察者进行订阅,当被观察对象Observable发生变化时,通知观察者。
Subscriber实现了Observer接口,因此Subscriber就是观察者,具备了观察者模式中的观察者角色的能力,可以注册到被观察对象Observable上,以接收状态更新的通知。
相关方法介绍
我们看一下LoadBalancerFeignClient里面用到的RxJava的相关方法,其他的RxJava方法我们就不研究了,因为重点是LoadBalancerFeignClient的源码,而不是学习RxJava。
Observable.create(OnSubscribe)
创建一个Observable对象。Observable是RxJava中的一个类,它允许你创建异步数据流,并通过订阅的方式处理数据。在创建Observable时,通过实现OnSubscribe接口的call方法来定义数据的产生方式。
Observable#just(T value)
创建一个Observable对象,并指定value为其发射的对象。
Observable#concatMap(Func1<? super T, ? extends Observable<? extends R>> func)
concatMap方法用于将一个Observable发射的每个元素变成另一个Observable发射的一系列元素。而其中的Func1可以用lambda表达式定义,是一个函数,该函数接收一个前一个Observable发射的对象,返回一个新的Observable对象,并将这个Observable对象发射的元素序列与前一个Observable对象发射的元素序列进行合并。
Observable#toBlocking()
用于将Observable对象转换为BlockingObservable对象。 BlockingObservable是RxJava中的一个阻塞式观察者模式的实现,它可以在订阅时阻塞当前线程,直到被观察者对象发送完所有数据或发生错误。
Observable#single()
用于从Observable对象中获取单个元素并返回。它要求Observable对象只能发送一个元素,如果发送了多个元素,则会抛出异常。如果Observable对象没有发送任何元素,则会返回一个空值。single()方法内部会调用Observable的subcribe()方法触发监听。
Observer#onNext(T t)
Observer是Subscriber实现的接口,该方法用于被观察者为观察者发射要观察的对象。
Observer#onError(Throwable e)
该方法用于被观察者通知观察者出现了错误。
Observer#onCompleted()
该方法用于被观察者通知观察者已经完成所有对象的发射。
LoadBalancerFeignClient源码解析
LoadBalancerFeignClient其实就是干了以下三件事:
- 负载均衡:通过Ribbon的ILoadBalancer的chooseServer()方法选出一个实例
- 重写url:根据选出的实例重构URI
- 发起http请求:默认使用HttpURLConnection,返回Response对象
LoadBalancerFeignClient#execute()
public Response execute(Request request, Request.Options options) throws IOException {
try {
...
// 调用FeignLoadBalancer的executeWithLoadBalancer方法
// 进行负载均衡,重写url,发出http请求
return lbClient(clientName)
.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
}
catch (...) {...}
}
lbClient(clientName)返回FeignLoadBalancer,然后调用FeignLoadBalancer的executeWithLoadBalancer方法,进入com.netflix.client.AbstractLoadBalancerAwareClient#executeWithLoadBalancer()
AbstractLoadBalancerAwareClient#executeWithLoadBalancer()
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
// 创建一个负载均衡命令对象
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
// 调用命令对象的submit方法,返回一个Observable对象
// 里面会进行负载均衡,选出一个实例,然后会调用下面的ServerOperation的call方法。
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
// 根据负载均衡选出的实例,重构URI
URI finalUri = reconstructURIWithServer(server, request.getUri());
// 替换请求对象中的URI为重构后的URI
S requestForServer = (S) request.replaceUri(finalUri);
try {
// 发出http请求
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
// 将Observable对象转换为BlockingObservable对象,可以在订阅时阻塞当前线程,直到被观察者对象发送完所有数据或发生错误。
.toBlocking()
// 从Observable对象中获取单个元素并返回
// 里面会调用Observable的subscribe()方法真正触发监听
.single();
} catch (...) {...}
}
首先创建一个LoadBalancerCommand对象,然后调用它的submit方法,返回一个Observable对象。这个Observable当被监听时,会进行负载均衡,选出一个实例,然后会调用ServerOperation的call方法。
ServerOperation的call方法里面根据负载均衡选出的实例重构URI,替换请求对象中的URI为重构后的URI,然后发出http请求,返回一个Observable对象。
然会会调用返回的Observable的toBlocking()把返回的Observable转成BlockingObservable,BlockingObservable会在订阅时阻塞当前线程,等待结果返回。
最后调用BlockingObservable的single()方法触发监听,阻塞等待结果返回。
LoadBalancerCommand#submit()
public Observable<T> submit(final ServerOperation<T> operation) {
...
Observable<T> o =
// selectServer()返回一个Observable
// Observable里面通过ILoadBalancer的chooseServer()方法选出一个实例
(server == null ? selectServer() : Observable.just(server))
// concatMap方法把前一个Observable发射的对象转换成另一个Observable
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(Server server) {
...
// 接收到上面的server,创建一个Observable,然后又调了一边concatMap方法
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
...
// 这里调用外面传进来的ServerOperation
// ServerOperation的call(server)方法会进行重构URI,发出http请求
// ServerOperation的call(server)返回的也是一个Observable,这里调用Observable的doOnEach方法进行后续操作
return operation.call(server).doOnEach(new Observer<T>() {...});
...
return o;
}
});
...
// 定义当发生错误时的操作
return o.onErrorResumeNext(...);
}
selectServer()返回一个Observable,里面会通过ILoadBalancer的chooseServer()方法选出一个实例。
然后会调用Observable的concatMap方法把前一个Observable发射的server(负载均衡选出的实例)转换成下一个Observable。
下一个Observable里面又通过Observable.just(server)创建一个新的Observable,然后再次调用concatMap方法,这个concatMap方法的Func函数最后绕来绕去才调用到ServerOperation的call方法。
LoadBalancerCommand#selectServer()
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
// 通过ILoadBalancer的chooseServer()方法选出一个实例
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
// 发射选出的实例
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
调用LoadBalancerContext的getServerFromLoadBalancer()方法通过ILoadBalancer的chooseServer()方法选出一个示例。
然后调用Subscriber的onNext方法把选出的实例发射出去,被订阅方接收。
LoadBalancerContext#getServerFromLoadBalancer()
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
...
// 获取Ribbon的ILoadBalancer负载均衡器对象
ILoadBalancer lb = getLoadBalancer();
...
// 调用ILoadBalancer的chooseServer()方法进行负载均衡,选出一个实例
Server svc = lb.chooseServer(loadBalancerKey);
...
}
进入到这里就可以看到熟悉的代码了,就是调用了Ribbon的ILoadBalancer接口的chooseServer()方法进行负载均衡,选出一个实例。
FeignLoadBalancer#execute()
ServerOperation的call()方法会调用FeignLoadBalancer#execute()方法发出http请求。
public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
throws IOException {
...
// 默认会调用Client.Default#execute(),使用HttpURLConnection
Response response = request.client().execute(request.toRequest(), options);
return new RibbonResponse(request.getUri(), response);
}
FeignLoadBalancer的execute()发出http请求,request.client()返回某个Client接口的实现类,Client代表http客户端,是Feign定义的接口,有多个实现类,默认使用Client.Default,里面通过HttpURLConnection发出http请求。
Client.Default#execute()
public Response execute(Request request, Options options) throws IOException {
HttpURLConnection connection = convertAndSend(request, options);
return convertResponse(connection, request);
}
可以看到就是使用了HttpURLConnection发出http请求。
里面的代码就不看了,最后附上一张LoadBalancerFeignClient的源码流程图: