【深入理解SpringCloud微服务】Spring-Cloud-OpenFeign源码解析(下)——LoadBalancerFeignClient详解

news2025/1/7 21:14:35

【深入理解SpringCloud微服务】Spring-Cloud-OpenFeign源码解析(下)——LoadBalancerFeignClient详解

  • RxJava简单介绍
    • RxJava示例
    • Observable与Subscriber
    • 相关方法介绍
      • Observable.create(OnSubscribe)
      • Observable#just(T value)
      • Observable#concatMap(Func1<? super T, ? extends Observable<? extends R>> func)
      • Observable#toBlocking()
      • Observable#single()
      • Observer#onNext(T t)
      • Observer#onError(Throwable e)
      • Observer#onCompleted()
  • LoadBalancerFeignClient源码解析
    • LoadBalancerFeignClient#execute()
    • AbstractLoadBalancerAwareClient#executeWithLoadBalancer()
    • LoadBalancerCommand#submit()
    • LoadBalancerCommand#selectServer()
    • LoadBalancerContext#getServerFromLoadBalancer()
    • FeignLoadBalancer#execute()
    • Client.Default#execute()

RxJava简单介绍

由于LoadBalancerFeignClient里面使用到了RxJava ,因此我们要先了解一下RxJava ,RxJava 是一个在Java平台上实现响应式编程的库,用于处理异步数据流。

RxJava示例

public class RxJavaExample {

    public static void main(String[] args) {
        Observable.create(new Observable.OnSubscribe<String>() {
		    @Override
		    public void call(Subscriber<? super String> subscriber) {
		        subscriber.onNext("str1");
		        subscriber.onNext("str2");
		        subscriber.onNext("str3");
		        subscriber.onCompleted();
		    }
		}).subscribe(new Subscriber<String>() {
		    @Override
		    public void onCompleted() {
		        System.out.println("onCompleted");
		    }
		
		    @Override
		    public void onError(Throwable e) {
		        System.out.println("onError: " + e);
		    }
		
		    @Override
		    public void onNext(String s) {
		        System.out.println("onNext: " + s);
		    }
		});
    }
}

这段代码首先调用Observable.create(…)方法创建了一个Observable对象,方法参数OnSubscribe定义了Observable被订阅时触发的动作:直接发射一组预定义的字符串。然后调用subscribe(…)方法订阅这个Observable,并通过方法参数Subscriber定义了接收到发射的数据、发生错误时和完成时的回调方法。

在这里插入图片描述

Observable与Subscriber

Observable代表可观察的对象,可以被多个观察者订阅并接收通知,提供了subscribe()方法,用于观察者进行订阅,当被观察对象Observable发生变化时,通知观察者。

Subscriber实现了Observer接口,因此Subscriber就是观察者,具备了观察者模式中的观察者角色的能力,可以注册到被观察对象Observable上,以接收状态更新的通知。

在这里插入图片描述

相关方法介绍

我们看一下LoadBalancerFeignClient里面用到的RxJava的相关方法,其他的RxJava方法我们就不研究了,因为重点是LoadBalancerFeignClient的源码,而不是学习RxJava。

在这里插入图片描述

Observable.create(OnSubscribe)

创建一个Observable对象。Observable是RxJava中的一个类,它允许你创建异步数据流,并通过订阅的方式处理数据。在创建Observable时,通过实现OnSubscribe接口的call方法来定义数据的产生方式。

Observable#just(T value)

创建一个Observable对象,并指定value为其发射的对象。

Observable#concatMap(Func1<? super T, ? extends Observable<? extends R>> func)

concatMap方法用于将一个Observable发射的每个元素变成另一个Observable发射的一系列元素。而其中的Func1可以用lambda表达式定义,是一个函数,该函数接收一个前一个Observable发射的对象,返回一个新的Observable对象,并将这个Observable对象发射的元素序列与前一个Observable对象发射的元素序列进行合并。

Observable#toBlocking()

用于将Observable对象转换为BlockingObservable对象。 BlockingObservable是RxJava中的一个阻塞式观察者模式的实现,它可以在订阅时阻塞当前线程,直到被观察者对象发送完所有数据或发生错误。

Observable#single()

用于从Observable对象中获取单个元素并返回。它要求Observable对象只能发送一个元素,如果发送了多个元素,则会抛出异常。如果Observable对象没有发送任何元素,则会返回一个空值。single()方法内部会调用Observable的subcribe()方法触发监听。

Observer#onNext(T t)

Observer是Subscriber实现的接口,该方法用于被观察者为观察者发射要观察的对象。

Observer#onError(Throwable e)

该方法用于被观察者通知观察者出现了错误。

Observer#onCompleted()

该方法用于被观察者通知观察者已经完成所有对象的发射。

LoadBalancerFeignClient源码解析

LoadBalancerFeignClient其实就是干了以下三件事:

  1. 负载均衡:通过Ribbon的ILoadBalancer的chooseServer()方法选出一个实例
  2. 重写url:根据选出的实例重构URI
  3. 发起http请求:默认使用HttpURLConnection,返回Response对象

在这里插入图片描述

LoadBalancerFeignClient#execute()

	public Response execute(Request request, Request.Options options) throws IOException {
		try {
			...
			// 调用FeignLoadBalancer的executeWithLoadBalancer方法
			// 进行负载均衡,重写url,发出http请求
			return lbClient(clientName)
					.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
		}
		catch (...) {...}
	}

lbClient(clientName)返回FeignLoadBalancer,然后调用FeignLoadBalancer的executeWithLoadBalancer方法,进入com.netflix.client.AbstractLoadBalancerAwareClient#executeWithLoadBalancer()

在这里插入图片描述

AbstractLoadBalancerAwareClient#executeWithLoadBalancer()

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    	// 创建一个负载均衡命令对象
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
        	// 调用命令对象的submit方法,返回一个Observable对象
        	// 里面会进行负载均衡,选出一个实例,然后会调用下面的ServerOperation的call方法。
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                    	// 根据负载均衡选出的实例,重构URI
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        // 替换请求对象中的URI为重构后的URI
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                        	// 发出http请求
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                // 将Observable对象转换为BlockingObservable对象,可以在订阅时阻塞当前线程,直到被观察者对象发送完所有数据或发生错误。
                .toBlocking()
                // 从Observable对象中获取单个元素并返回
                // 里面会调用Observable的subscribe()方法真正触发监听
                .single();
        } catch (...) {...}
        
    }

首先创建一个LoadBalancerCommand对象,然后调用它的submit方法,返回一个Observable对象。这个Observable当被监听时,会进行负载均衡,选出一个实例,然后会调用ServerOperation的call方法。

ServerOperation的call方法里面根据负载均衡选出的实例重构URI,替换请求对象中的URI为重构后的URI,然后发出http请求,返回一个Observable对象。

然会会调用返回的Observable的toBlocking()把返回的Observable转成BlockingObservable,BlockingObservable会在订阅时阻塞当前线程,等待结果返回。

最后调用BlockingObservable的single()方法触发监听,阻塞等待结果返回。

在这里插入图片描述

LoadBalancerCommand#submit()

    public Observable<T> submit(final ServerOperation<T> operation) {
        ...

        Observable<T> o = 
        		// selectServer()返回一个Observable
        		// Observable里面通过ILoadBalancer的chooseServer()方法选出一个实例
                (server == null ? selectServer() : Observable.just(server))
                // concatMap方法把前一个Observable发射的对象转换成另一个Observable
                .concatMap(new Func1<Server, Observable<T>>() {
                    @Override
                    public Observable<T> call(Server server) {
                        ...
                        // 接收到上面的server,创建一个Observable,然后又调了一边concatMap方法
                        Observable<T> o = Observable
                                .just(server)
                                .concatMap(new Func1<Server, Observable<T>>() {
                                    @Override
                                    public Observable<T> call(final Server server) {
                                        ...
                                        // 这里调用外面传进来的ServerOperation
                                        // ServerOperation的call(server)方法会进行重构URI,发出http请求
                                        // ServerOperation的call(server)返回的也是一个Observable,这里调用Observable的doOnEach方法进行后续操作
                                        return operation.call(server).doOnEach(new Observer<T>() {...});
                        
                        ...
                        return o;
                    }
                });
            
        ...
        // 定义当发生错误时的操作
        return o.onErrorResumeNext(...);
    }

selectServer()返回一个Observable,里面会通过ILoadBalancer的chooseServer()方法选出一个实例。

然后会调用Observable的concatMap方法把前一个Observable发射的server(负载均衡选出的实例)转换成下一个Observable。

下一个Observable里面又通过Observable.just(server)创建一个新的Observable,然后再次调用concatMap方法,这个concatMap方法的Func函数最后绕来绕去才调用到ServerOperation的call方法。

在这里插入图片描述

LoadBalancerCommand#selectServer()

    private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                	// 通过ILoadBalancer的chooseServer()方法选出一个实例
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    // 发射选出的实例
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }

调用LoadBalancerContext的getServerFromLoadBalancer()方法通过ILoadBalancer的chooseServer()方法选出一个示例。

然后调用Subscriber的onNext方法把选出的实例发射出去,被订阅方接收。

在这里插入图片描述

LoadBalancerContext#getServerFromLoadBalancer()

    public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
        ...
        // 获取Ribbon的ILoadBalancer负载均衡器对象
        ILoadBalancer lb = getLoadBalancer();
        ...
        		// 调用ILoadBalancer的chooseServer()方法进行负载均衡,选出一个实例
                Server svc = lb.chooseServer(loadBalancerKey);
                ...
    }

进入到这里就可以看到熟悉的代码了,就是调用了Ribbon的ILoadBalancer接口的chooseServer()方法进行负载均衡,选出一个实例。

在这里插入图片描述

FeignLoadBalancer#execute()

ServerOperation的call()方法会调用FeignLoadBalancer#execute()方法发出http请求。

	public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
			throws IOException {
		...
		// 默认会调用Client.Default#execute(),使用HttpURLConnection
		Response response = request.client().execute(request.toRequest(), options);
		return new RibbonResponse(request.getUri(), response);
	}

FeignLoadBalancer的execute()发出http请求,request.client()返回某个Client接口的实现类,Client代表http客户端,是Feign定义的接口,有多个实现类,默认使用Client.Default,里面通过HttpURLConnection发出http请求。

在这里插入图片描述

在这里插入图片描述

Client.Default#execute()

    public Response execute(Request request, Options options) throws IOException {
      HttpURLConnection connection = convertAndSend(request, options);
      return convertResponse(connection, request);
    }

可以看到就是使用了HttpURLConnection发出http请求。

在这里插入图片描述

里面的代码就不看了,最后附上一张LoadBalancerFeignClient的源码流程图:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2050185.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

实战OpenCV之图像显示

基础入门 OpenCV提供的功能非常多&#xff0c;图像显示是最基础也是最直观的一部分。它让我们能够直观地看到算法处理后的效果&#xff0c;对于调试和验证都至关重要。在OpenCV中&#xff0c;图像显示主要依赖于以下四个关键的数据结构和函数。 1、Mat类。这是OpenCV中最基本的…

文心快码(Baidu Comate)快速创建数据可视化图表

给你分享一个免费的编码助手——文心快码 Baidu Comate&#xff01;百度文心大模型&#xff0c;46%采纳率&#xff0c;百度30%的代码都是它写的&#xff01;AI这个大腿&#xff0c;你确定不抱一下&#xff1f;快来安装使用吧&#xff0c;送京东卡&#xff01; https://dwz.cn/3…

高校疫情防控web系统pf

TOC springboot365高校疫情防控web系统pf 第1章 绪论 1.1 课题背景 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为人们提供服务。所以各行…

平移矩阵、点绕轴的旋转矩阵、平面直角坐标系旋转矩阵、点绕向量旋转公式(罗德里格斯旋转公式)

平移矩阵 点绕轴的旋转矩阵 平面直角坐标系旋转矩阵 点绕向量旋转公式(罗德里格斯旋转公式) 代码 #include "myPoint.h" #include <cmath> myPoint::myPoint() {m_x m_y m_z 0; }myPoint::myPoint(double x, double y, double z):m_x(x),m_y(y),m_z(z) { }…

探索tailwindcss多主题切换

现在的多主题切换基本上都是用的 css 变量的形式, 而tailwindcss也支持 css 变量定义主题的方式 至于为什么用 tailwindcss变量, 还是因为 tailwind 写类名提示比较方便, 也不需要再在css或者style中去一个个var的形式去写变量了 这里我在assets/style/theme文件夹中创建了三个…

智能与生产力、生产关系的关系

机器学习和自主系统是推动新质生产力和新质生产关系形成的关键技术。它们与这两个概念之间的关系可以从以下几个方面进行分析&#xff1a; 一、机器学习与新质生产力 提升效率和精准度&#xff1a;机器学习通过对大量数据进行分析&#xff0c;能够提供精准的预测和决策支持。这…

MyBatis(初阶)

1.什么是MyBtis MyBatis是持久层框架&#xff0c;⽤于简化JDBC的开发。 2.准备工作 2.1 创建⼯程 数据库: 2.2 配置数据库连接字符串 以application.yml⽂件为例: 2.3 写持久层代码 Data public class UserInfo {private Integer id;private String username;private Stri…

YOLOv10训练,适合小白训练,新手YOLOv10训练自己数据集教程!超简单,超详细!!

YOLOv10训练&#xff0c;适合小白训练&#xff0c;新手YOLOv10训练自己数据集教程&#xff01;超简单&#xff0c;超详细&#xff01;&#xff01; AI学术叫叫兽在这&#xff01;家人们&#xff0c;给我遥遥领先&#xff01;&#xff01;&#xff01; 方法一&#xff1a;云服务…

如何打造一款爆款手游?

现在开发一款游戏太简单了&#xff0c;各种源码满地飞&#xff0c;大家拿过来随便改改有个版号就可以上线运营了&#xff0c; 但是这种的游戏品质一般都不会怎么样&#xff0c;留存的周期也是比较短的&#xff0c;更别说让玩家持续消费了&#xff0c;想要打造一款火热的游戏我们…

Android Media Framework(十八)ACodec - Ⅵ

ACodec之所以复杂&#xff0c;主要是因为状态太多。在上一篇文章中&#xff0c;我们学习了在ExecutingState下对buffer的处理。ExecutingState可能会切换到OutputPortSettingsChangedState、FlushingState&#xff0c;或者当组件被释放时&#xff0c;进入UninitializedState。接…

泛微云桥前台文件上传漏洞-202408

漏洞简介 2024 年 8 月份新出漏洞&#xff0c;泛微云桥任意文件上传漏洞&#xff0c;详情如图所示。 环境搭建 1、下载漏洞环境。 https://wx.weaver.com.cn/download 2、运行install64.bat&#xff0c;安装环境。 3、安装成功界面。 未安装补丁&#xff0c;系统不能使用…

Java方法01:什么是方法

本节视频链接&#xff1a;Java方法01&#xff1a;什么是方法&#xff1f;_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV12J41137hu?p45&vd_sourceb5775c3a4ea16a5306db9c7c1c1486b5 Java中的‌方法‌是一段执行特定任务的代码片段&#xff0c;‌它是程序的基本构…

Keepalived:不只是心跳检测,更是高可用性的秘密武器

keepalived博客(Keepalived&#xff1a;不只是心跳检测&#xff0c;更是高可用性的秘密武器) 文章目录 keepalived博客(**Keepalived&#xff1a;不只是心跳检测&#xff0c;更是高可用性的秘密武器**)keepalived介绍概述工作原理核心模块应用场景配置与安装总结 keepalived基本…

工 厂设计模式

简单工厂模式 基本介绍 1) 简单工厂模式是属于创建型模式,是工厂模式的一种。 简单工厂模式是由一个工厂对象决定创建出哪一 种产品类 的实例。简单工厂模式是工厂模式家族中最简单实用的模式 2) 简单工厂模式:定义了一个创建对象的类,由这个类来 封装实例化对象的行为 (代…

从零开始学cv-6:图像的灰度变换

文章目录 一&#xff0c;简介&#xff1a;二、图像的线性变换三、分段线性变换四&#xff0c;非线性变换4.1 对数变换4.2 Gamma变换 五&#xff0c;效果: 一&#xff0c;简介&#xff1a; 图像灰度变换涉及对图像中每个像素的灰度值执行数学运算&#xff0c;进而调整图像的视觉…

Python基础和变量使用

1. 基础了解 1.1 运行方式 Python有多种运行方式&#xff0c;以下是几种常见的执行Python代码的方法&#xff1a; 交互式解释器&#xff1a; 打开终端或命令提示符&#xff0c;输入python或python3&#xff08;取决于你的系统配置&#xff09;&#xff0c;即可进入Python交互…

HelpLook AI 知识库:为企业提供高效智能的知识管理解决方案

“管理就是把复杂的问题简单化&#xff0c;混乱的事情规范化。” 在当今竞争激烈的商业环境中&#xff0c;企业面临着快速变化的市场需求和日益复杂的业务流程。为了保持竞争力并提升运营效率&#xff0c;选择一款合适的知识管理系统至关重要。在众多选项中&#xff0c;HelpLoo…

day05--Vue

一、Vue入门 1.1入门案例 1.在页面中引入vue.js框架 2.定义vue对象 let app new Vue({ el:"#vue作用域的div标签id", data:{ //所有数据模型 }&#xff0c; methods:{ //页面中所有触发的js方法 }&#xff0c; created(){ //页面初始化&#xff0c;准备调用方法 } …

MODELSIM仿真报错解决记录

目录 问题&#xff1a;Modelsim报错&#xff1a;Error (10228): Verilog HDL error at Line_Shift_RAM_1Bit.v(39): module “Line_Shift_RAM_1 原因&#xff1a;创建的IP核放到了别的位置 解决方法&#xff1a;删掉IP核以及QIP等文件&#xff0c;将IP核创建到工程目录下 问…

vue3旋转木马型轮播图,环型滚动

<template><div><div class"content"><div class"but1" click"rotateLeft">--向左</div><div class"ccc"><main id"main"><div class"haha" ref"haha"&g…