Dubbo的线程模型

news2025/1/24 17:46:52

1 线程模型概述

Dubbo默认的底层网络通信使用的是Netty。

服务提供方NettyServer使用两级线程池,其EventLoopGroup(boss)主要用来接收客户端的连接请求,并把完成TCP三次握手的连接分发给EventLoopGroup(worker)来处理。boss和worker线程组称为I/O线程。之后连接可以直接被I/O线程处理,或者派发给业务线程池进行处理。

根据请求的消息是被I/O线程处理还是被业务线程池处理,Dubbo提供了以下几种线程模型(或称线程调度模型)。其中 all 模型是默认的线程模型。

1.1 all(AllDispatcher)

所有消息都被派发到业务线程池,这些消息包括请求、响应、连接事件、断开事件、心跳事件等。

1.2 direct(DirectDispatcher)

所有消息都不被派发到业务线程池,全部在I/O线程上执行。

1.3 message(MessageOnlyDispatcher)

只有请求、响应消息被派发到业务线程池,其他消息直接在I/O线程上执行。

1.4 execution(ExecutionDispatcher)

只有请求消息被派发到业务线程池,其他消息直接在I/O线程上执行。

1.5 connection(ConnectionOrderedDispatcher)

除连接、断开事件以外,其他消息都被派发到业务线程池。在I/O线程上将连接、断开事件放入队列,有序的逐个执行。

2 源码分析

在Dubbo中,线程模型的扩展接口为Dispatcher,其提供的扩展实现都实现了该接口。

Dispatcher只有一个dispatch方法,用于分发消息给业务线程池,源码如下所示。

/**
 * ChannelHandlerWrapper (SPI, Singleton, ThreadSafe)
 */
@SPI(value = AllDispatcher.NAME, scope = ExtensionScope.FRAMEWORK)
public interface Dispatcher {

    /**
     * dispatch the message to threadpool.
     *
     * @param handler
     * @param url
     * @return channel handler
     */
    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
    // The last two parameters are reserved for compatibility with the old configuration
    ChannelHandler dispatch(ChannelHandler handler, URL url);

}

2.1 all(AllDispatcher)

所有消息都被派发到业务线程池,这些消息包括请求、响应、连接事件、断开事件、心跳事件等。

AllDispatcher的源码如下所示,其核心实现为AllChannelHandler类。

/**
 * default thread pool configure
 */
public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }

}


public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    // 连接完成事件,交给业务线程池处理
    @Override
    public void connected(Channel channel) throws RemotingException {
        // 业务线程池
        ExecutorService executor = getSharedExecutorService();
        try {
            // 执行连接事件
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    // 连接断开事件,交给业务线程池处理
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    // 请求响应事件,交给业务线程池处理
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    // 异常处理事件,交给业务线程池处理
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

其中AllChannelHandler类实现了ChannelHandler,ChannelHandler的主要方法如下所示。

@SPI(scope = ExtensionScope.FRAMEWORK)
public interface ChannelHandler {

    /**
     * on channel connected.
     *
     * @param channel channel.
     */
    void connected(Channel channel) throws RemotingException;

    /**
     * on channel disconnected.
     *
     * @param channel channel.
     */
    void disconnected(Channel channel) throws RemotingException;

    /**
     * on message sent.
     *
     * @param channel channel.
     * @param message message.
     */
    void sent(Channel channel, Object message) throws RemotingException;

    /**
     * on message received.
     *
     * @param channel channel.
     * @param message message.
     */
    void received(Channel channel, Object message) throws RemotingException;

    /**
     * on exception caught.
     *
     * @param channel   channel.
     * @param exception exception.
     */
    void caught(Channel channel, Throwable exception) throws RemotingException;

}

2.2 direct(DirectDispatcher)

所有消息都不被派发到业务线程池,全部在I/O线程上执行。DirectDispatcher的源码如下所示。

public class DirectDispatcher implements Dispatcher {

    public static final String NAME = "direct";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new DirectChannelHandler(handler, url);
    }

}


public class DirectChannelHandler extends WrappedChannelHandler {

    public DirectChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        if (executor instanceof ThreadlessExecutor) {
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        } else {
            handler.received(channel, message);
        }
    }

}

2.3 message(MessageOnlyDispatcher)

只有请求、响应消息被派发到业务线程池,其他消息直接在I/O线程上执行。MessageOnlyDispatcher的源码如下所示。

/**
 * Only message receive uses the thread pool.
 */
public class MessageOnlyDispatcher implements Dispatcher {

    public static final String NAME = "message";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new MessageOnlyChannelHandler(handler, url);
    }

}


public class MessageOnlyChannelHandler extends WrappedChannelHandler {

    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

}

2.4 execution(ExecutionDispatcher)

只有请求消息被派发到业务线程池,其他消息直接在I/O线程上执行。ExecutionDispatcher的源码如下所示。

public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ExecutionChannelHandler(handler, url);
    }

}


public class ExecutionChannelHandler extends WrappedChannelHandler {

    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);

        if (message instanceof Request) {
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
                // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
                // this scenario from happening, but a better solution should be considered later.
                if (t instanceof RejectedExecutionException) {
                    sendFeedback(channel, (Request) message, t);
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
            }
        } else if (executor instanceof ThreadlessExecutor) {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } else {
            handler.received(channel, message);
        }
    }
}

2.5 connection(ConnectionOrderedDispatcher)

除连接、断开事件以外,其他消息都被派发到业务线程池。在I/O线程上将连接、断开事件放入队列,有序的逐个执行。ConnectionOrderedDispatcher的源码如下所示。

/**
 * connect disconnect ensure the order
 */
public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ConnectionOrderedChannelHandler(handler, url);
    }

}


public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    // I/O线程的线程池
    protected final ThreadPoolExecutor connectionExecutor;
    private final int queueWarningLimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
            new NamedThreadFactory(threadName, true),
            new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queueWarningLimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    // 连接建立事件由I/O线程处理
    @Override
    public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    // 连接断开事件由I/O线程处理
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    // 请求、响应事件由业务线程处理
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    // 异常处理事件由业务线程处理
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queueWarningLimit) {
            logger.warn(TRANSPORT_CONNECTION_LIMIT_EXCEED, "", "", "connectionordered channel handler queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queueWarningLimit);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1495156.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【音视频开发好书推荐】RTC程序设计:实时音视频权威指南

目录 1、WebRTC概述2、好书推荐3、本书内容4、本书特色5、作者简介6、谁适合看这本书 1、WebRTC概述 WebRTC&#xff08;Web Real-Time Communication&#xff09;是一个由Google发起的实时音视频通讯C开源库&#xff0c;其提供了音视频采集、编码、网络传输&#xff0c;解码显…

【考研数学】基础660太难了?一个办法搞定660

觉得题目太难&#xff0c;大概率是题目超出了自己当前的水平 题型没见过&#xff0c;或者太复杂&#xff0c;属于跳级学习了&#xff0c;正确的思路就是回归到自己的水平线&#xff0c;题目略难即可。 这样做题的话&#xff0c;大部分题目涉及的点不会超出自己的能力范围&…

【详识JAVA语言】String 类1

String类的重要性 在C语言中已经涉及到字符串了&#xff0c;但是在C语言中要表示字符串只能使用字符数组或者字符指针&#xff0c;可以使用标准库提 供的字符串系列函数完成大部分操作&#xff0c;但是这种将数据和操作数据方法分离开的方式不符合面相对象的思想&#xff0c;而…

python之海龟绘图

海龟绘图&#xff08;turtle&#xff09;是一个Python内置的绘图库&#xff0c;也被称为“Turtle Graphics”或简称“Turtles”。它采用了一种有趣的绘图方式&#xff0c;模拟一只小海龟在屏幕上爬行&#xff0c;而小海龟爬行的路径就形成了绘制的图形。这种绘图方式最初源自20…

考研数学——高数:多元函数微分法及其应用

因为复习阶段全篇很细节的写下来一来比较费时间&#xff0c;二容易导致为了记笔记而记。 接下来的内容只会保留上课中比较有意义的地方&#xff0c;以及有自己助于理解的想法 全微分 助记&#xff1a; 证明是否可微&#xff0c;首先判断两个偏导数是否存在&#xff0c;不存在则…

插入排序和归并排序

插入排序&#xff0c;Insertion Sort. 给出伪代码 for i 1,2,...,n-1Insert A[i] into Sorted array A[0:i-1]by swaping down to the correct position. 冒泡排序 冒泡排序就是一种插入排序算法。 i ← 1 while i < length(A)j ← iwhile j > 0 and A[j-1] > A…

FlyClient SPV client轻量化

这篇文章主要是为了构建一种轻客户端的算法。 如果使用SPV 的方式验证交易&#xff0c;每个client上面需要存储非常多的header。使用 proofs of proof-of-work 的方式&#xff0c;使得请客户端仅仅下载少量的区块头就能验证这一条链的安全性&#xff0c;然后再对包含交易的区块…

【详识JAVA语言】String类2

常用方法 字符串的不可变性 String是一种不可变对象. 字符串中的内容是不可改变。字符串不可被修改&#xff0c;是因为&#xff1a; 1. String类在设计时就是不可改变的&#xff0c;String类实现描述中已经说明了 以下来自JDK1.8中String类的部分实现&#xff1a; String类…

2D/3D相机手眼标定总结

1. 九点标定 九点标定法的本质&#xff1a; 无需进行相机内参标定&#xff0c;只能识别x&#xff0c;y坐标&#xff0c;属于2D平面标定&#xff0c;在标定过程中z是未知的。 该算法的核心是仿射变换&#xff0c;即图像坐标系到机器人坐标系的2D仿射变换&#xff08;注意这里并不…

【python--比对两个列表获取列表中出现频率最高的词及频率】

&#x1f680; 作者 &#xff1a;“码上有前” &#x1f680; 文章简介 &#xff1a;Python &#x1f680; 欢迎小伙伴们 点赞&#x1f44d;、收藏⭐、留言&#x1f4ac; python练习题 完整代码 完整代码 from collections import Counter from data_keywords import extract_…

【Linux】文件传输工具lrzsz的安装与使用

目录 一、关于lrzsz 二、安装lrzsz 三、lrzsz的说明及使用 1、上传命令rz 2、下载命令sz 一、关于lrzsz 在开发的过程中&#xff0c;经常遇到 需要在 Linux 和 Windows 之间上传下载文件的情况 这时&#xff0c;一般都是使用 FTP 或者 WinSCP 工具进行上传下载, 虽然也能…

SRIO—IP讲解及说明

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、SRIO IP 概述1.1 逻辑层1.1.1 I/O 端口(I/O Port)1.1.2 消息端口(Messaing Port)1.1.3 用户自定义端口(User-Defined Port)1.1.4 维护端口(Maintenance Port)1.2 缓冲层1.3 物理层…

python+django+vue房屋租赁系统 8gwmf

房屋租赁系统在设计与实施时&#xff0c;采取了模块性的设计理念&#xff0c;把相似的系统的功能整合到一个模组中&#xff0c;以增强内部的功能&#xff0c;减少各组件之间的联系&#xff0c;从而达到减少相互影响的目的。如房源信息、预约信息、求租信息模块等[12]。 管理员后…

java工程师面试简历模板,2024谈一下当下最合适的Java架构

前言 这些算法&#xff0c;都是小编一点一点看的大佬们的方法&#xff0c;自己积累的. 如果有什么描述的不对的地方还望大佬赐教 多交流才能进步&#xff0c;加油&#xff0c;冲冲冲&#xff01;&#xff01;&#xff01; 目录 一、冒泡排序 二、选择排序 三、插入排序 四、快速…

Redis中的RDB和AOF持久化机制(一)

Redis持久化 RDB快照(snapshot). 在默认情况下&#xff0c;Redis将内存数据库快照保存在名字为dump.rdb的二进制文件中.Redis可以进行设置,让它在"N秒内数据集至少有M个改动"这一条件被满足时&#xff0c;自动保存一次数据集。比如说&#xff0c;以下设置会让Redis…

软件测试需求分析如何编写?为什么要进行测试需求分析?

在软件开发的过程中&#xff0c;软件测试需求分析是至关重要的一个环节。测试需求分析是指对待测软件的需求进行全面细致的分析&#xff0c;明确软件测试的目标和范围&#xff0c;为测试活动的进行提供指导。通过对软件需求的详细分析&#xff0c;可以确保测试人员清楚了解软件…

配置与管理防火墙

配置与管理防火墙 1&#xff0c;概念&#xff1a;设置在不同网络或网络安全域之间的一系列部件的组合。 2&#xff0c;功能&#xff1a;保护内网中易手攻击的服务&#xff1b;控制内外网之间网络系统的访问&#xff1b;隐藏内网的IP地址及结构的细节&#xff0c;提高网络保护…

3月每日一题笔记

感谢我的好朋友的鼓励 3月4日 两种等价方式&#xff1f;都是错误的 ->加减中不能使用等价无穷小&#xff1f; ->不全面。 两项无穷小相减, 那么两项无穷小比值的极限不等于 1 时, 或者两项无穷小相加时, 其比值极限不等于 −1 时, 代数和差各项可以用等价无穷小替换 等…

基于springboot的海滨体育馆管理系统的设计与实现论文

摘 要 本基于Spring Boot的海滨体育馆管理系统设计目标是实现海滨体育馆的信息化管理&#xff0c;提高管理效率&#xff0c;使得海滨体育馆管理工作规范化、高效化。 本文重点阐述了海滨体育馆管理系统的开发过程&#xff0c;以实际运用为开发背景&#xff0c;基于Spring Boot…