Dubbo源码解析-Dubbo的线程模型(九)

news2024/11/25 13:17:28

一、Dubbo线程模型

首先明确一个基本概念:IO 线程和业务线程的区别
IO 线程:配置在netty 连接点的用于处理网络数据的线程,主要处理编解码等直接与网络数据
打交道的事件。
业务线程:用于处理具体业务逻辑的线程,可以理解为自己在provider 上写的代码所执行的线
程环境。

Dubbo 默认采用的是长链接的方式,即默认情况下一个consumer 和一个provider 之间只会建立
一条链接,这种情况下IO 线程的工作就是编码和解码数据,监听具体的数据请求,直接通过Channel发布数据等等;

有两个参数⽤来配置服务消费者和服务提供者直接的socket连接个数:

1. shareconnections:表示可共享的socket连接个数
2. connections:表示不共享的socket连接个数

服务A的shareconnections或者connections为2时,服务A的消费者会向服务A的提供者建⽴两个socket连接:

业务线程就是处理IO 线程处理之后的数据,业务线程并不知道任何跟网络相关的内容,只是纯
粹的处理业务逻辑,在业务处理逻辑的时候往往存在复杂的逻辑,所以业务线程池的配置往往都要
比IO 线程池的配置大很多。
IO 线程部分,Netty 服务提供方NettyServer 又使用了两级线程池,master 主要用来接受客户
端的链接请求,并把接受的请求分发给worker 来处理。整个过程如下图:

IO 线程与业务线程的交互如下:

IO 线程的派发策略:

默认是all:所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。即worker 线程接收到事件后,将该事件提交到业务线程池中,自己再去处理其他IO 事件。
direct:worker 线程接收到事件后,由worker 执行到底。
message:只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在IO 线程上执行
execution:只有请求消息派发到线程池,不含响应(客户端线程池),响应和其它连接断开事件,心跳等消息,直接在IO线程上执行
connection:在IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

业务线程池设置:
fixed:固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
coresize:200
maxsize:200
队列:SynchronousQueue
回绝策略:AbortPolicyWithReport - 打印线程信息jstack,之后抛出异常
cached:缓存线程池,空闲一分钟自动删除,需要时重建。
limited:可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。

配置示例:

<dubbo:protocol name="dubbo"dispatcher="all"threadpool="fixed"threads="100"/>

在整个消费者调用过程中,各个线程池都比较重要,其中比较有特色的就是AllChannelHandler,它完成了IO线程转向用户线程的任务转移,比较关键。 

二、派发策略(All)源码解析

消费者启动的时候会执行DubboProtocol#initClient建议与服务端端的socket连接

private ExchangeClient initClient(URL url) {

	ExchangeClient client;
	try {
		// Replace InstanceAddressURL with ServiceConfigURL.
		url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(),  url.getParameters());
		// connection should be lazy
		if (url.getParameter(LAZY_CONNECT_KEY, false)) {
			client = new LazyConnectExchangeClient(url, requestHandler);
		} else {
			client = Exchangers.connect(url, requestHandler);
		}

	} catch (RemotingException e) {
		throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
	}

	return client;
}
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
	return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

Transporters#connect最终返回的是NettyClient,点进这个对象的构造方法

public Client connect(URL url, ChannelHandler handler) throws RemotingException {
	return new NettyClient(url, handler);
}
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
	// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
	// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
	super(url, wrapChannelHandler(url, handler));
}

NettyClient#wrapChannelHandler中再次利用SPI机制获取线程派发策略,dubbo默认的策略为allDispatcher策略。

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
	return new MultiMessageHandler(new HeartbeatHandler(url.getOrDefaultFrameworkModel().getExtensionLoader(Dispatcher.class)
			.getAdaptiveExtension().dispatch(handler, url)));
}
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
	return new AllChannelHandler(handler, url);
}

 除了默认的AllDispatcher,还有DirectDispatcher,MessageOnlyDispatcher等。

 当消费端接收到远程服务端的响应之后,按照Netty的处理流程,消息会在channel绑定的handler上传递,netty底层会调用handler#received。可以看到connected,disconnected,received,caught等方法都是在新的线程池ExecutorService中执行,executor.execute方法会将任务ChannelEventRunnable提交到ExecutorService中。

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

由于AllChannelHandler方法是在前面handler的基础上包装了一层,所以ChannelEventRunnable中会将消息传递给AllHandlel的下一个handler,从这里也清晰的看到了AllChannelHandler完成了IO线程转向用户线程的任务转移。

public void run() {
	if (state == ChannelState.RECEIVED) {
		try {
			handler.received(channel, message);
		} catch (Exception e) {
			logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
					+ ", message is " + message, e);
		}
	} else {
		switch (state) {
		case CONNECTED:
			try {
				handler.connected(channel);
			} catch (Exception e) {
				logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
			}
			break;
		case DISCONNECTED:
			try {
				handler.disconnected(channel);
			} catch (Exception e) {
				logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
			}
			break;
		case SENT:
			try {
				handler.sent(channel, message);
			} catch (Exception e) {
				logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
						+ ", message is " + message, e);
			}
			break;
		case CAUGHT:
			try {
				handler.caught(channel, exception);
			} catch (Exception e) {
				logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
						+ ", message is: " + message + ", exception is " + exception, e);
			}
			break;
		default:
			logger.warn("unknown state: " + state + ", message is " + message);
		}
	}

}

三、AllDispatcher策略异常超时问题

Dubbo有一个经典问题,就是当配置了消息派发策略为AllDispatcher时,当服务端线程池满了之后,当消费端再次发送请求,就会一直傻傻等待超时导致没有任何服务端响应。那么问题就出现在AllChannelHandler,前面已经说了AllDispatcher策略就是所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。即worker 线程接收到事件后,将该事件提交到业务线程池中,自己再去处理其他IO 事件。

问题出现原因:

那么当服务端线程池打满之后,此时又再次来了一个请求,此时依然会提交给线程池执行,那么了解线程池原理的就清楚线程池任务满了之后会执行拒绝策略抛出RejectedExecutionException异常,此时就会进入到received的catch方法中去,然后就又再次抛出ExecutionException异常。

            
public void received(Channel channel, Object message) throws RemotingException {
	ExecutorService executor = getPreferredExecutorService(message);
	try {
		executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
	} catch (Throwable t) {
		
		throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
	}
}

那么抛出的异常就又会被netty捕获,进而继续执行nettyHandler的caught方法,可以看到这里又再次将任务丢到了线程池中。但是此时线程池依然是满的,业务线程池所有线程都堵住了,所以也不能将异常消息返回给客户端,然后客户端消费者只能傻傻等到超时。

public void caught(Channel channel, Throwable exception) throws RemotingException {
	ExecutorService executor = getSharedExecutorService();
	try {
		executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
	} catch (Throwable t) {
		throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
	}
}

解决办法可以设置dispatcher为message,只有请求和响应交给业务线程池处理,其他的在IO线程处理,配置如下:

<dubbo:protocol name="dubbo" dispatcher="message" />

后面dubbo也修复了这个问题,再received方法的catch中新加了一部分逻辑,注释的大致意思也就是说:修复当线程池满了之后异常信息无法被发送给消费端的问题(当线程池满了,拒绝执行任务,会引起消费端等待超时),所以代码中判断了下当抛出异常为RejectedExecutionException时,就不把异常抛出交给AllChannelHandler#caught方法中的线程池执行,而是直接用IO线程在通过channel将消息及时反馈给消费者,消费者也就会收到服务端的“threadpool is exhausted ,detail msg”等响应消息。

public void received(Channel channel, Object message) throws RemotingException {
	ExecutorService cexecutor = getExecutorService();
	try {
		cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
	} catch (Throwable t) {
		//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. 
		//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
		if(message instanceof Request && t instanceof RejectedExecutionException){
			Request request = (Request)message;
			if(request.isTwoWay()){
				String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
				Response response = new Response(request.getId(), request.getVersion());
				response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
				response.setErrorMessage(msg);
				channel.send(response);
				return;
			}
		}
		throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2247317.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端全栈 === 快速入 门 Redis

目录 简介 通过 docker 的形式来跑&#xff1a; set、get 都挺简单&#xff1a; incr 是用于递增的&#xff1a; keys 来查询有哪些 key: redis insight GUI 工具。 list 类型 left push rpush lpop 和 rpop 自然是从左边和从右边删除数据。​编辑 如果想查看数据…

Python MySQL SQLServer操作

Python MySQL SQLServer操作 Python 可以通过 pymysql 连接 MySQL&#xff0c;通过 pymssql 连接 SQL Server。以下是基础操作和代码实战示例&#xff1a; 一、操作 MySQL&#xff1a;使用 pymysql python 操作数据库流程 1. 安装库 pip install pymysql2. 连接 MySQL 示例 …

编程语言之C++诞生记!

成长路上不孤单&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a; 【14后&#x1f60a;///C爱好者&#x1f60a;///持续分享所学&#x1f60a;///如有需要欢迎收藏转发///&#x1f60a;】 今日分享关于C诞生的相关内容&#xff01; 关于【C诞…

核心差异:知识VS文档管理(+工具软件安利)

在讨论知识管理和文档管理时&#xff0c;我们经常会听到这两种说法被混淆使用。然而&#xff0c;它们各自服务于不同的目的&#xff0c;这一点至关重要。 想象一下&#xff0c;你是一名项目经理&#xff0c;面临以下两项任务&#xff1a; 存储最新的项目计划捕捉团队讨论中获…

医院挂号就诊系统(源码+数据库+报告)

基于SpringBoot的医院挂号就诊系统&#xff0c;系统包含三种角色&#xff1a;管理员、医生、用户,系统分为前台和后台两大模块&#xff0c;主要功能如下。 前台&#xff1a; - 首页&#xff1a;展示医院相关信息、推荐医生等内容。 - 健康教育&#xff1a;提供健康知识、文章等…

【热门主题】000065 探索人工智能学习框架:开启智能未来的钥匙

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 【热…

《智慧教育实时数据分析推荐项目》详细分析

一、项目介绍 1、背景介绍 在互联网、移动互联网的带动下&#xff0c;教育逐渐从线下走向线上&#xff0c;在线教育近几年一直处于行业的风口浪尖&#xff0c;那随着基础设施的不断完善&#xff0c;用户需求也发生不少变化&#xff0c;因此传统教育机构、新兴互联网企业都在探…

使用LUKS对Linux磁盘进行加密

前言 本实验用于日常学习用&#xff0c;如需对存有重要数据的磁盘进行操作&#xff0c;请做好数据备份工作。 此实验只是使用LUKS工具的冰山一角&#xff0c;后续还会有更多功能等待探索。 LUKS&#xff08;Linux Unified Key Setup&#xff09;是Linux系统中用于磁盘加密的一…

在 cmd 输入 python.exe 后不报错也无反应的问题

在 cmd 输入 python.exe 后不报错&#xff1a;‘python.exe ’不是内部或外部命令&#xff0c;也不是可运行的程序或批处理文件&#xff0c;也无反应。只是显示这样一个弹窗&#xff1a; 查了下环境变量path&#xff0c;看看有什么地方有python.exe&#xff0c;发现原来在C:\Us…

10、PyTorch autograd使用教程

文章目录 1. 相关思考 1. 相关思考

如何在 Ubuntu 22 04 上安装和配置 Ansible 自动化平台

如何在 Ubuntu 22.04 上安装和配置 Ansible 自动化平台 简介 Ansible 是一个开源项目&#xff0c;并在 Github 上收获了 63k 的 star 。它是一个极其简单的 IT 自动化平台&#xff0c;使您的应用程序和系统更易于部署和维护。使用 SSH&#xff0c;以接近简单英语的语言实现从…

PowerMILL 客制化宏 - 用户菜单定义

用户右键菜单 在PowerMILL元素浏览器空白的地方右键弹出的菜单叫用户右键菜单。用户右键菜单可以调用宏或命令或用户二次开发的应用或批处理等等。 用户右键菜单定义 用户右键菜单需要建立一个没有扩展名的 “user_menu” 名称的文件&#xff0c;一般存放在 “C:\dcam\pmill2…

006 单片机嵌入式中的C语言与代码风格规范——常识

00 环境准备&#xff1a; 配置MDK支持C99 内置stdint.h介绍 stdint.h 是从 C99 中引进的一个标准 C 库的文件 路径&#xff1a;D:\MDK\ARM\ARMCC\include 01 C语言基础语法 一般的bug很有可能是C语言功底不扎实导致…… 1.结构体 由若干基本数据类型集合组成的一种自定义数…

《生成式 AI》课程 作业6 大语言模型(LLM)的训练微调 Fine Tuning -- part1

资料来自李宏毅老师《生成式 AI》课程&#xff0c;如有侵权请通知下线 Introduction to Generative AI 2024 Spring 该文档主要介绍了国立台湾大学&#xff08;NTU&#xff09;2024 年春季 “生成式人工智能&#xff08;GenAI&#xff09;” 课程的作业 5&#xff08;GenAI HW…

ZYNQ-7020嵌入式系统学习笔记(1)——使用ARM核配置UART发送Helloworld

本工程实现调用ZYNQ-7000的内部ARM处理器&#xff0c;通过UART给电脑发送字符串。 硬件&#xff1a;正点原子领航者-7020 开发平台&#xff1a;Vivado 2018、 SDK 1 Vivado部分操作 1.1 新建工程 设置工程名&#xff0c;选择芯片型号。 1.2 添加和配置PS IP 点击IP INTEGR…

JSONCPP 数据解析与序列化

常用类接口 Json::Value 类 用于存储 JSON 数据的核心类。它支持将数据解析为对象、数组或基本类型&#xff08;如字符串、数值等&#xff09; 赋值操作符&#xff1a;Value& operator(Value other); 用于将一个 JSON 值赋给另一个 JSON 值 Json::Value value; value &…

排序(Java数据结构)

1. 排序的概念及引用 1.1 排序的概念 排序&#xff1a;所谓排序&#xff0c;就是使一串记录&#xff0c;按照其中的某个或某些关键字的大小&#xff0c;递增或递减的排列起来的操作。(所有的排序都是默认从小到大排序) 稳定性&#xff1a;假定在待排序的记录序列中&#xff…

JavaParser 的全面介绍

JavaParser 是什么&#xff1f; JavaParser 的快速介绍可以参考&#xff1a; # JavaParser的快速介绍 JavaParser是一个用于解析Java源码的开源工具&#xff0c;它提供了一种简单而有效的方式来解析和操作Java代码。JavaParser解析源码的方式主要基于其将Java代码转换为抽象语…

【君正T31开发记录】8.了解rtsp协议及设计模式

前边搞定了驱动&#xff0c;先不着急直接上手撸应用层的代码&#xff0c;先了解一下大致要用到的东西。 设计PC端先用vlc rtsp暂时H264编码&#xff08;vlc好像不支持h265,这个后边我试试&#xff09;的视频流&#xff0c;先需要支持上rtsp server&#xff0c;了解rtsp协议是必…

大数据新视界 -- Hive 数据分区:精细化管理的艺术与实践(上)(7/ 30)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…