Idea+maven+spring-cloud项目搭建系列--11-3 dubbo限流和熔断

news2025/1/16 4:53:47

前言: dubbo 作为rpc 通信组件,在使用过程中,如何避免服务提供端被多个消费端撑爆,消费端如何避免因为服务端的故障造成结果响应超时。

1 服务提供端的限流措施:

1.1 使用 :dubbo.protocol.accepts 参数限制服务端同时链接消费端的连接数
在这里插入图片描述
0 表示不限制连接数;

1.2 使用: dubbo.provider.executes 限制每个方法并行处理的最大可并行执行请求:
在这里插入图片描述
也可以在类级别进行定义从而覆盖全局的配置:@DubboService(executes = 2)

2 消费端提供的熔断措施:

2.1 服务端在代理类中定义mock 的返回值:

@DubboReference(mock = "return null")
private DubboThreeService threeService;
@RequestMapping(value = "/dubbo-three", method = RequestMethod.GET)
public String indexTestThree() {
    Object obj =threeService.testOne("123");
   return String.valueOf(obj);
}

当服务端异常时 方法直接返回null 值;

测试类DubboThreeService:

public interface DubboThreeService {
    String testOne(String token);
}

2.2 服务端定义异常抛出类:

@DubboReference(mock = "throw org.lgx.bluegrass.api.common.BizException")
private DubboThreeService threeService;

异常类:BizException

@Data
public class BizException extends RuntimeException {

    private static final long serialVersionUID = 1L;

    protected int errorCode ;
    protected String errorMessage;
    private Map errorMap;

    public BizException() {
        super();
        this.errorCode =500_001;
        this.errorMessage = "dubbo exception";

    }
}

2.3 对于单个代理类进行处理:
1) 定义mock 的类:

@DubboReference(mock = "org.lgx.bluegrass.api.service.impl.DubboTestServiceImpl")
private DubboTestService  dubboTestService;

测试类:DubboTestService

public interface DubboTestService {
    // 暴露dubbo 服务
    String test(String token);

}

用本服务中的实现类代替进行结果返回:

public class DubboTestServiceImpl implements DubboTestService {
    @Override
    public String test(String token) {
        return "hello !";
    }
}

2) 在DubboTestService 接口类同层级包中定义类 DubboThreeServiceMock
mock 类名字规则:代理的接口类名+Mock

public class DubboThreeServiceMock implements DubboThreeService {
    @Override
    public String testOne(String token) {
        return "mock hello";
    }
}

代理类中mock 就可以省去默认实现的类路径:

@DubboReference(mock = "true")
private DubboThreeService threeService;

3 限流的实现:

请求并发限制:dubbo.provider.executes:ExecuteLimitFilter

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.dubbo.rpc.filter;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcStatus;
import org.apache.dubbo.rpc.Filter.Listener;

@Activate(
    group = {"provider"},
    value = {"executes"}
)
public class ExecuteLimitFilter implements Filter, Listener {
    private static final String EXECUTE_LIMIT_FILTER_START_TIME = "execute_limit_filter_start_time";

    public ExecuteLimitFilter() {
    }

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    	// 服务路径
        URL url = invoker.getUrl();
        // 调用的方法名
        String methodName = invocation.getMethodName();
        // 获取方法定义的最大并发量如果没有设置则 会取integer的最大值
        int max = url.getMethodParameter(methodName, "executes", 0);
        // 判断当前方法正在并发执行的请求数量,如果超过最大数量则直接抛出异常
        if (!RpcStatus.beginCount(url, methodName, max)) {
            throw new RpcException(7, "Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
        } else {
            invocation.put("execute_limit_filter_start_time", System.currentTimeMillis());

            try {
                return invoker.invoke(invocation);
            } catch (Throwable var7) {
                if (var7 instanceof RuntimeException) {
                    throw (RuntimeException)var7;
                } else {
                    throw new RpcException("unexpected exception when ExecuteLimitFilter", var7);
                }
            }
        }
    }

    public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
    	// 方法调用完成,并发数量-1
        RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), this.getElapsed(invocation), true);
    }

    public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
        if (t instanceof RpcException) {
            RpcException rpcException = (RpcException)t;
            if (rpcException.isLimitExceed()) {
                return;
            }
        }
		// 方法调用完成,并发数量-1
        RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), this.getElapsed(invocation), false);
    }

    private long getElapsed(Invocation invocation) {
        Object beginTime = invocation.get("execute_limit_filter_start_time");
        return beginTime != null ? System.currentTimeMillis() - (Long)beginTime : 0L;
    }
}

RpcStatus.beginCount 判断方法:

private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap();
private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS =
 new ConcurrentHashMap();
public static boolean beginCount(URL url, String methodName, int max) {
    max = max <= 0 ? 2147483647 : max;
    // 应用级别的RpcStatus 
     RpcStatus appStatus = getStatus(url);
     // 方法级别的RpcStatus 
     RpcStatus methodStatus = getStatus(url, methodName);
     if (methodStatus.active.get() == 2147483647) {
         return false;
     } else {
         int i;
         do {
         	// 获取此时并行执行的方法数量,大于定义的则返回false
             i = methodStatus.active.get();
             if (i + 1 > max) {
                 return false;
             }
         } while(!methodStatus.active.compareAndSet(i, i + 1));

         appStatus.active.incrementAndGet();
         return true;
     }
 }
public static RpcStatus getStatus(URL url) {
 String uri = url.toIdentityString();
 // 应用级别记录的 RpcStatus
 return (RpcStatus)SERVICE_STATISTICS.computeIfAbsent(uri, (key) -> {
     return new RpcStatus();
 });
}

public static void removeStatus(URL url) {
 String uri = url.toIdentityString();
 SERVICE_STATISTICS.remove(uri);
}

public static RpcStatus getStatus(URL url, String methodName) {
 String uri = url.toIdentityString();
 
 ConcurrentMap<String, RpcStatus> map = (ConcurrentMap)METHOD_STATISTICS.computeIfAbsent(uri, (k) -> {
     return new ConcurrentHashMap();
 });
 // 方法级别记录的RpcStatus
 return (RpcStatus)map.computeIfAbsent(methodName, (k) -> {
     return new RpcStatus();
 });
    

4 熔断的实现:
熔断主要通过mock 数据模拟进行处理:MockClusterInvoker:

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    // 获取mock 的值
    String value = this.getUrl().getMethodParameter(invocation.getMethodName(), "mock", Boolean.FALSE.toString()).trim();
    // 如果没有定义mock ,或者定义为false 则直接调用远程的方法
    if (value.length() != 0 && !"false".equalsIgnoreCase(value)) {
    	// 如果定义强制走本服务的方法实现,则直接走本服务
        if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + this.getUrl());
            }

            result = this.doMockInvoke(invocation, (RpcException)null);
        } else {
            try {
            	// 不强制走本地服务,则发起远程调用
                result = this.invoker.invoke(invocation);
                if (result.getException() != null && result.getException() instanceof RpcException) {
                    RpcException rpcException = (RpcException)result.getException();
                    if (rpcException.isBiz()) {
                        throw rpcException;
                    }
					// 如果远程调用失败,则发起本地服务调用
                    result = this.doMockInvoke(invocation, rpcException);
                }
            } catch (RpcException var5) {
            	// 方法调用异常
                if (var5.isBiz()) {
                    throw var5;
                }

                if (logger.isWarnEnabled()) {
                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + this.getUrl(), var5);
                }
				// 发起本地服务调用
                result = this.doMockInvoke(invocation, var5);
            }
        }
    } else {
    	// 远程方法的调用
        result = this.invoker.invoke(invocation);
    }

    return result;
}

MockInvoker 本地服务方法实现:

public Result invoke(Invocation invocation) throws RpcException {
    if (invocation instanceof RpcInvocation) {
        ((RpcInvocation)invocation).setInvoker(this);
    }
// 获取mock 的定义
    String mock = null;
    if (this.getUrl().hasMethodParameter(invocation.getMethodName())) {
        mock = this.getUrl().getParameter(invocation.getMethodName() + "." + "mock");
    }

    if (StringUtils.isBlank(mock)) {
        mock = this.getUrl().getParameter("mock");
    }

    if (StringUtils.isBlank(mock)) {
        throw new RpcException(new IllegalAccessException("mock can not be null. url :" + this.url));
    } else {
        mock = normalizeMock(URL.decode(mock));
        // 如果定义了 return 则取return 的值进行返回
        if (mock.startsWith("return ")) {
            mock = mock.substring("return ".length()).trim();

            try {
                Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
                Object value = parseMockValue(mock, returnTypes);
                return AsyncRpcResult.newDefaultAsyncResult(value, invocation);
            } catch (Exception var5) {
                throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: " + this.url, var5);
            }
        } else if (mock.startsWith("throw")) {
        	// 定义直接抛出异常
            mock = mock.substring("throw".length()).trim();
            if (StringUtils.isBlank(mock)) {
                throw new RpcException("mocked exception for service degradation.");
            } else {
                Throwable t = getThrowable(mock);
                throw new RpcException(3, t);
            }
        } else {
            try {
            	// 走本地服务的调用
                Invoker<T> invoker = this.getInvoker(mock);
                return invoker.invoke(invocation);
            } catch (Throwable var6) {
                throw new RpcException("Failed to create mock implementation class " + mock, var6);
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/402270.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

面试热点题:回溯算法之组合 组合与组合总和 III

什么是回溯算法&#xff1f; 回溯算法也可以叫回溯搜索算法&#xff0c;回溯是递归的"副产品",回溯的本质是穷举&#xff0c;然后选出我们需要的数据&#xff0c;回溯本身不是特别高效的算法&#xff0c;但我们可以通过"剪枝"来优化它。 理解回溯算法 回溯…

电脑游戏怎么录屏?其实很简单,只需要简单3步

电脑游戏一直是游戏爱好者最热衷的游戏之一。但是&#xff0c;有时候我们想分享我们在游戏中的精彩时刻&#xff0c;或者记录我们的游戏过程以便后续观看和学习。在这种情况下&#xff0c;录屏就成了必不可少的工具。但是&#xff0c;许多人可能不知道电脑游戏怎么录屏。在本文…

逆向分析——壳

你脑海中的壳是什么 壳在自然界是动物的保护壳&#xff0c;软件同样有保护壳&#xff0c;为了防止破解 也许大海给贝壳下的定义是珍珠&#xff0c;也许时间给煤炭下的定义是钻石 ——沙与沫 壳的由来 在DOS时代&#xff0c;壳一般指的是磁盘加密软件中的一段加密程序 后来发展…

APM新添加UAVCAN设备

简介 UAVCAN是一种轻量级协议,旨在通过CAN总线在航空航天和机器人应用中实现可靠通信。要实现通信&#xff0c;最基本需要data_type_ id, signature、数据结构、设备程序初始化。 添加设备数据结构文件(.uavcan格式) 1.在以下路径添加设备数据结构文件&#xff0c;根据设备类…

三体到底是啥?用Python跑一遍就明白了

文章目录拉格朗日方程推导方程组微分方程算法化求解画图动图绘制温馨提示&#xff0c;只想看图的画直接跳到最后一节拉格朗日方程 此前所做的一切三体和太阳系的动画&#xff0c;都是基于牛顿力学的&#xff0c;而且直接对微分进行差分化&#xff0c;从而精度非常感人&#xf…

Web漏洞-CSRF漏洞

CSRF漏洞介绍&#xff1a;CSRF&#xff08;Cross-Site Request Forgery&#xff09;&#xff0c;中文名称&#xff1a;跨站请求伪造&#xff0c;是一种劫持用户在当前已登录的Web应用程序上执行非本意操作一种攻击.原理&#xff1a;攻击者利用目标用户的身份&#xff0c;执行某…

基于Stackelberg博弈的光伏用户群优化定价模型(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

keras学习之回调函数的使用

回调函数 回调函数是一个对象&#xff08;实现了特定方法的类实例&#xff09;&#xff0c;它在调用fit()时被传入模型&#xff0c;并在训练过程中的不同时间点被模型调用可以访问关于模型状态与模型性能的所有可用数据模型检查点&#xff08;model checkpointing&#xff09;…

【SAP PO】X-DOC:SAP PO 接口配置 REST 服务对接填坑记

X-DOC&#xff1a;SAP PO 接口配置 REST 服务对接填坑记1、背景2、PO SLD配置3、PO https证书导入1、背景 &#xff08;1&#xff09;需求背景&#xff1a; SAP中BOM频繁变更&#xff0c;技术人员在对BOM进行变更后&#xff0c;希望及时通知到相关使用人员 &#xff08;2&…

配天智造自主原创数字工厂:百余名员工人均创收122万

配天智造&#xff08;832223&#xff09;2022年度报告显示&#xff0c;报告期内公司实现营业收入1.3亿元&#xff0c;同比增长52%&#xff0c;归属于挂牌公司股东的净利润3867万元&#xff0c;同比增长28.11%。而这家公司全部在职员工仅有107人&#xff0c;人均创收约为122万。…

计算机科学导论笔记(七)

目录 九、程序设计语言 9.1 演化 9.1.1 机器语言 9.1.2 汇编语言 9.1.3 高级语言 9.2 翻译 9.2.1 编译 9.2.2 解释 9.2.3 翻译过程 9.3 编程模式 9.3.1 面向过程模式 9.3.2 面向对象模式 9.3.3 函数式模式 9.3.4 声明式模式 9.4 共同概念 九、程序设计语言 9.1 …

Spring Cloud Alibaba全家桶(六)——微服务组件Sentinel介绍与使用

前言 本文小新为大家带来 微服务组件Sentinel介绍与使用 相关知识&#xff0c;具体内容包括分布式系统存在的问题&#xff0c;分布式系统问题的解决方案&#xff0c;Sentinel介绍&#xff0c;Sentinel快速开始&#xff08;包括&#xff1a;API实现Sentinel资源保护&#xff0c;…

ABAQUS免费培训 Abaqus成型 焊接 疲劳多工况课程

一、详解Abaqus多工况分析在工程中&#xff0c;多工况的情况是普遍存在的情况&#xff0c;而单工况孤立存在是十分理想状态下的假设。例如我们在进行强度分析时&#xff0c;都是假设其本身是不存在应力的&#xff0c;然后基于这种无初始应力下的计算&#xff0c;使得我们不得不…

aop实现接口访问频率限制

引言 项目开发中我们有时会用到一些第三方付费的接口&#xff0c;这些接口的每次调用都会产生一些费用&#xff0c;有时会有别有用心之人恶意调用我们的接口&#xff0c;造成经济损失&#xff1b;或者有时需要对一些执行时间比较长的的接口进行频率限制&#xff0c;这里我就简…

OpenGL超级宝典学习笔记:纹理

前言 本篇在讲什么 本篇章记录对OpenGL中纹理使用的学习 本篇适合什么 适合初学OpenGL的小白 本篇需要什么 对C语法有简单认知 对OpenGL有简单认知 最好是有OpenGL超级宝典蓝宝书 依赖Visual Studio编辑器 本篇的特色 具有全流程的图文教学 重实践&#xff0c;轻理…

MP4文件播放不了是什么原因?原因及解决办法分享!

为什么mp4文件播放不了&#xff1f;常见的有三种原因&#xff0c;可能是由于视频流或音频流不兼容导致&#xff0c;可能是由于视频文件损坏&#xff0c;也可能是因为电脑上缺乏编解码器。下面小编根据mp4文件无法播放的三种可能进行针对性解答。 原因一&#xff1a;视频流或音频…

基于SSM的学生竞赛模拟系统

基于SSM的学生竞赛模拟系统 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目背景介绍&#x…

DPU54国产全速USB1.1HUB控制器芯片替代AU9254

目录DPU54简介结构框图DPU54主要特性性能特点典型应用领域DPU54简介 DPU54是高性能、低功耗4口全速 USB1.1 HUB 控制器芯片&#xff0c;上行端口兼容全速 12MHz 模式&#xff0c;4 个下行端口兼容全速 12MHz、低速 1.5MHz 两种模式。 DPU54采用状态机单事务处理架构&#xff0…

windows 11系统,通过ip地址远程连接连接ubuntu 22.04系统(共同局域网下,另一台主机不需要联网)

windows 11系统&#xff0c;通过ip地址远程连接连接ubuntu 22.04系统&#xff08;不需要联网&#xff09;问题来源问题分析解决方案问题来源 自己搭建了一台ubuntu系统作为深度学习的机器&#xff0c;但是学校的网络问题&#xff0c;一个账号只能同时登录3台设备。通过远程连接…

C#完全掌握控件之-combbox

无论是QT还是VC&#xff0c;这些可视化编程的工具&#xff0c;掌握好控件的用法是第一步&#xff0c;C#的控件也不例外&#xff0c;尤其这些常用的控件。常见控件中较难的往往是这些与数据源打交道的&#xff0c;比如CombBox、ListBox、ListView、TreeView、DataGridView. 文章…