10 dubbo源码学习_线程池

news2025/1/15 8:09:25

    • 1. 线程模型&线程池介绍
      • 1.1 线程池
      • 1.2 线程模型
    • 2. 线程池源码分析
      • 2.1 FixedThreadPool
      • 2.2 CachedThreadPool
      • 2.3 LimitedThreadPool
    • 3. 线程模型源码
      • 3.1 AllDispatcher
      • 3.2 DirectDispatcher
      • 3.3 MessageOnlyDispatcher
      • 3.4 ExecutionDispatcher
      • 3.5 ConnectionOrderedDispatcher

1. 线程模型&线程池介绍

1.1 线程池

dubbo内部采用netty做为通信工具,netty包括bossGroup和workerGroup,bossGroup负责接收accept连接,连接就绪后,将连接交给workerGroup进行处理;
默认情况下:
bossGroup:线程个数:1个,队列长度,Integer.MAX;创建1个NioEventLoop,这个NioEventLoop主要进行accept操作;
workerGroup:线程个数:Math.min(Runtime.getRuntime().availableProcessors() + 1, 32),一般是CPU核数+1,会创建这么多的NioEventLoop;
以上这些都是属于netty的,而在dubbo中,也有一个业务线程池,为什么又要有一个业务线程池呢,是因为我们的业务一般处理慢,如果直接使用workerGroup的线程去处理业务的话,会降级dubbo的处理性能;

修改workerGroup线程个数
通过 iothreads 属性指定workerGroup个数;

dubbor线程池配置
dubbo提供了如下的线程池,可以在:dubbo:protocol threadpool=""中指定;

  • fixed:固定大小的线程池,默认大小是200,如果未配置:queues 参数,则默认使用SynchronousQueue;
  • cached:缓存线程池,空闲一分钟自动删除,默认大小为:如果未配置 threads 参数,则 Integer.MAX_VALUE
  • limited:可伸缩线程池,但是线程池中的线程数只会增长不会收缩,这样做的目的是为了避免当进行收缩时流量突然增加造成性能问题。默认大小:如果未配置 threads 参数,则 200

1.2 线程模型

上面说了dubbo的线程池,除了netty的bossGroup和workerGroup之外,另外提供的业务线程池,那什么场景下使用业务线程池,这种策略该如何选呢?
所以dubbo提供了:dubbo:protocol dispatcher=""属性,可以指定规则:

  • all:所有(除发送)消息都由业务线程池执行;包括:请求、响应、连接、断开连接、心跳;
  • direct:所有消息都不派发到线程池,由workerGroup直接处理;
  • message:只有请求响应发给线程池,其他连接直接在IO线程上执行;
  • execution:只请求消息发到线程池,其他由IO线程池上执行;
  • connection:在IO线程上,将连接断开事件放入队列,有序逐个执行,其他消息派发到线程池;

2. 线程池源码分析

源码的入口 :
在这里插入图片描述

public WrappedChannelHandler(ChannelHandler handler, URL url) {
    this.handler = handler;
    this.url = url;

    String componentKey;
    if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
        componentKey = Constants.CONSUMER_SIDE;
        if (url.getParameter(SHARE_EXECUTOR_KEY, false)) {
            ExecutorService cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT);
            if (cExecutor == null) {
                cExecutor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
                dataStore.put(componentKey, SHARED_CONSUMER_EXECUTOR_PORT, cExecutor);
                cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT);
            }
            executor = cExecutor;
        } else {
            // 获取 URL中的 threadpool参数,参数指定的是使用的线程池类型
            executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
            dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
        }
    } else {
        componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }
}

dubbo提供了线程池ThreadPool,默认的SPI是"fixed",其实现类如下:

2.1 FixedThreadPool

public Executor getExecutor(URL url) {
    // threadname 如果未配置,默认为:Dubbo
    String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
    // threads 表示corePoolSize,默认为:200
    int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
    // queues 表示队列长度,默认为:0
    int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
    // 核心线程池大小、最大线程池大小,默认都是200,如果流量比较小的情况下,直接设置为200会导致资源占用情况严重;
    // keepalivetime 0 ms
    // 如果未配置queues的话,默认为0,会选用SynchronousQueue同步队列
    // 否则使用LinkedBlockingQueue队列,长度为queues;
    return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
            queues == 0 ? new SynchronousQueue<Runnable>() :
                    (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                            : new LinkedBlockingQueue<Runnable>(queues)),
            new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}

FixedThreadPool的话,初始化时,线程池大小即为200,不会随着流量的增长或缩小而改变线程池大小;而且队列如果指定,默认使用SynchronousQueue队列(没有长度);

2.2 CachedThreadPool

public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        // threadname 如果未配置,默认为:Dubbo
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // corethreads 表示corethreads,默认为:0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // threads 表示maxPoolSize,默认为:Integer.MAX_VALUE
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        // queues 表示队列长度,默认为:0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // alive 默认为:60 * 1000
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        // 指定coreSize、maxPoolSize、keepaliveTime参数;
        // 队列如未指定也是默认:SynchronousQueue
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

CachedThreadPool 核心线程数为0,表示在空闲的情况下,它不需要保留任何活跃的线程,最大线程数为Integer.MAX_VALUE,队列如果未指定,则默认使用SynchronousQueue
当线程空闲1分钟时,会自动进行释放;

2.3 LimitedThreadPool

public class LimitedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        // threadname 如果未配置,默认为:Dubbo
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // corethreads 表示corethreads,默认为:0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // threads 表示maxPoolSize,默认为:200
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 核心线程数0,最大200,keepalivetime为Integer.MAX_VALUE
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

LimitedThreadPool表示,最大可扩容至200个线程,初始时为0,线程约为不过期;

3. 线程模型源码

all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher

他们都继承自Dispatcher接口,程序的入口在:

// 
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
    return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
// 这里会获取Dispatcher
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
            .getAdaptiveExtension().dispatch(handler, url)));
}

dubbo 服务消费者 入口

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
    super(url, wrapChannelHandler(url, handler));
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
    url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
    url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
    return ChannelHandlers.wrap(handler, url);
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
    return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
// 这里会获取 Dispatcher
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
            .getAdaptiveExtension().dispatch(handler, url)));
}

各类Dispatcher的处理类都继承自ChannelHandler

public interface ChannelHandler {

    /**
		连接事件
     */
    void connected(Channel channel) throws RemotingException;

    /**
		断开连接
     */
    void disconnected(Channel channel) throws RemotingException;

    /**
		发送
     */
    void sent(Channel channel, Object message) throws RemotingException;

    /**
		接收
     */
    void received(Channel channel, Object message) throws RemotingException;

    /**
		异常
     */
    void caught(Channel channel, Throwable exception) throws RemotingException;

}
  • 建立连接:connected,主要是的职责是在channel记录read、write的时间,以及处理建立连接后的回调逻辑,比如dubbo支持在断开后自定义回调的hook(onconnect),即在该操作中执行。
  • 断开连接:disconnected,主要是的职责是在channel移除read、write的时间,以及处理端开连接后的回调逻辑,比如dubbo支持在断开后自定义回调的hook(ondisconnect),即在该操作中执行。
  • 发送消息:sent,包括发送请求和发送响应。记录write的时间。
  • 接收消息:received,包括接收请求和接收响应。记录read的时间。
  • 异常捕获:caught,用于处理在channel上发生的各类异常。

3.1 AllDispatcher

所有消息都由业务线程池执行;包括:请求、响应、连接、断开连接、心跳;它的处理handler是:AllChannelHandler

public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        // 创建对应的handler处理类
        return new AllChannelHandler(handler, url);
    }
}


public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        // 它的主要任务就是初始化 executor
        super(handler, url);
    }
    // 连接
    @Override
    public void connected(Channel channel) throws RemotingException {
        // 获取线程池,实现方法由父类实现,初始化是由构造器调用父类构造器时进行的初始化;
        ExecutorService cexecutor = getExecutorService();
        try {
            // 提交到线程池执行;
            // ChannelEventRunnable有run方法,execute会执行它的run方法;
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

public class ChannelEventRunnable implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);

    private final ChannelHandler handler;
    private final Channel channel;
    private final ChannelState state;
    private final Throwable exception;
    private final Object message;

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state) {
        this(channel, handler, state, null);
    }

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) {
        this(channel, handler, state, message, null);
    }

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Throwable t) {
        this(channel, handler, state, null, t);
    }

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
        this.channel = channel;
        this.handler = handler;
        this.state = state;
        this.message = message;
        this.exception = exception;
    }

    @Override
    public void run() {
        // 这里面定义了各类消息的处理,其实也是会直接调用传递进来的handler的对应方法;
        if (state == ChannelState.RECEIVED) {
            try {
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
        } else {
            switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is: " + message + ", exception is " + exception, e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }

    /**
     * ChannelState
     *
     *
     */
    public enum ChannelState {

        /**
         * CONNECTED
         */
        CONNECTED,

        /**
         * DISCONNECTED
         */
        DISCONNECTED,

        /**
         * SENT
         */
        SENT,

        /**
         * RECEIVED
         */
        RECEIVED,

        /**
         * CAUGHT
         */
        CAUGHT
    }

}

AllDispatcher会创建一个AllChannelHandler,而它内部会将各类处理提交至业务线程池,业务线程池会执行对应handler的消息方法处理;

在这里插入图片描述

  • 在IO线程中执行的操作有:
    • sent操作在IO线程上执行。
    • 序列化响应在IO线程上执行。
  • 在Dubbo线程中执行的操作有:
    • received、connected、disconnected、caught都是在Dubbo线程上执行的。
    • 反序列化请求的行为在Dubbo中做的。

3.2 DirectDispatcher

前面介绍过,它是所有的消息直接由workerGroup进行处理;

public class DirectDispatcher implements Dispatcher {

    public static final String NAME = "direct";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return handler;
    }

}

所以它的源码非常简单,直接由原handler进行处理即可;
在这里插入图片描述

  • 在IO线程中执行的操作有:
    • received、connected、disconnected、caught、sent操作在IO线程上执行。
    • 反序列化请求和序列化响应在IO线程上执行。
  • 并没有在Dubbo线程操作的行为。

3.3 MessageOnlyDispatcher

在provider端,Message Only Dispatcher和Execution Dispatcher的线程模型是一致的 只有请求发给线程池,其他连接直接在IO线程上执行,对应的handler处理类:MessageOnlyChannelHandler

public class MessageOnlyChannelHandler extends WrappedChannelHandler {

    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}

其他事件都由父类WrappedChannelHandler实现,而父类的实现方式是直接调用handler.xx方法;

3.4 ExecutionDispatcher

只请求消息发到线程池,其他由IO线程池上执行;

public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ExecutionChannelHandler(handler, url);
    }

}

在这里插入图片描述

  • 在IO线程中执行的操作有:
    • sent、connected、disconnected、caught操作在IO线程上执行。
    • 序列化响应在IO线程上执行。
  • 在Dubbo线程中执行的操作有:
    • received都是在Dubbo线程上执行的。
    • 反序列化请求的行为在Dubbo中做的。

3.5 ConnectionOrderedDispatcher

在IO线程上,将连接断开事件放入队列,有序逐个执行,其他消息派发到线程池;

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 初始化了一个connection的连接线程池,最大和核心线程数都是1;
        // LinkedQueue最大长度为Integer.MAX_VALUE;
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        // 虽然 connection的连接池队列是LinkedQueue,但是,这个limit会限制 默认 1000;
        queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        try {
            // 检查队列是否超过limit;
            checkQueueLength();
            // 未超过,则交至connectionExecutor执行;
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        try {
            // 检查队列是否超过limit;
            checkQueueLength();
            // 未超过,则交至connectionExecutor执行;
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
        }
    }


    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            // 其余的操作还是交至 业务线程池去执行;
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout.
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                Request request = (Request) message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            // 其余的操作还是交至 业务线程池去执行;
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queuewarninglimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
        }
    }
}

在这里插入图片描述

  • 在IO线程中执行的操作有:
    • sent操作在IO线程上执行。
    • 序列化响应在IO线程上执行。
  • 在Dubbo线程中执行的操作有:
    • received、connected、disconnected、caught都是在Dubbo线程上执行的。但是connected和disconnected两个行为是与其他两个行为通过线程池隔离开的。并且在Dubbo connected thread pool中提供了链接限制、告警等能力。
    • 反序列化请求的行为在Dubbo中做的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/467992.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Visual Studio C# WinForm开发入门(6):TreeView 控件使用

TreeView控件用树显示节点层次。 例如&#xff1a;顶级目录是根(C:)&#xff0c;C盘下的每个子目录都是子节点&#xff0c;而每个子目录又都有自己的子节点 TreeView属性和方法&#xff1a; 属性说明CheckBoxes表示节点旁边是否出现复选框ImageList指定一个包含节点图标的Imag…

Spring Cloud Gateway 服务网关的部署与使用详细介绍

为什么需要服务网关 传统的单体架构中只需要开放一个服务给客户端调用&#xff0c;但是微服务架构中是将一个系统拆分成多个微服务&#xff0c;如果没有网关&#xff0c;客户端只能在本地记录每个微服务的调用地址&#xff0c;当需要调用的微服务数量很多时&#xff0c;它需要…

【音视频第20天】wireshark+tcpdump

tcpdump抓 wireshark分析 目录 tcpdumpwireshark tcpdump tcpdump参数详解 网上一搜一大堆。最全的不是用tcpdump -h而是man tcpdump来查询手册。 tcpdump -i eth0 -p udp -xx -Xs 0 -w /root/test2.cap -i 针对eth0网卡的&#xff0c;ifconfig是查看有几个网卡 -i eth0 表示…

海睿思分享 | 终于有人把指标体系和标签体系说清楚了

当前&#xff0c;随着企业数字化转型如火如荼地开展&#xff0c;在企业经营管理数字化的数据建设过程中&#xff0c;经常会遇到指标和标签的使用场景。 指标体系到底是什么&#xff1f;标签体系又是什么&#xff1f;这些疑问导致在数据分析过程中效率低下、科学性不高&#xf…

回首来路多感概,最是奋斗动人心。

我们必需要在不同的时代有不同的领悟&#xff0c;才能充满生机地去迎接生命中每个新的开始。 文章目录 前言 初心 成长 收获 憧憬 出发 前言 今天是我成为csdn创作者一周年纪念日&#xff0c;我非常开心能够和大家分享我的写作之旅。在这一年里&#xff0c;我经历了许多挑…

ChatGPT实现数据集模拟生成,ChatGPT实现密码生成

数据集模拟生成 之前章节我们已经演示过ChatGPT 如何根据 prompt 编写文章或续写文章&#xff0c;文本生成的作用不仅仅在语文方面有用&#xff0c;本节我们演示另一种场景&#xff0c;利用 ChatGPT 来生成数据。看似作用差不多&#xff0c;其实这是目前开源社区非常常用的大语…

低代码和零代码二子争夺,你扶谁上位?

传统的软件研发方式目前并不能很好地满足企业的需求&#xff1a;人员成本高、研发时间长、运维复杂。这时低代码或零代码工具的出现为快速开发软件提供了更好的思路。对于不太了解两者的人来说&#xff0c;零代码和低代码是什么&#xff1f;又有什么联系与区别&#xff1f; 什么…

手把手教你安装telnet(离线方式+在线方式)

系列文章目录 文章目录 系列文章目录前言一、telnet是什么&#xff1f;二、安装步骤总结 前言 一、telnet是什么&#xff1f; Telnet协议是TCP/IP协议族中的一员&#xff0c;是Internet远程登录服务的标准协议和主要方式。它为用户提供了在本地计算机上完成远程主机工作的能力…

[openwrt] valgrind定位内存泄漏

目录 要求 valgrind 简介 工具介绍 linux程序的内存布局 内存检查的原理 valgrind的使用 使用举例 内存泄漏 内存越界 内存覆盖 Linux分配虚拟内存&#xff08;申请内存&#xff09;的两种方式 brk和mmap 要求 被调试程序带有-g参数编译&#xff0c;携带debug参数…

二维码在数字化班组管理中的应用

为了更好地贯彻落实集团公司对班组安全建设的要求&#xff0c;可以运用“加法思维”&#xff0c;勇于直面当前安全班组建设中的突出问题&#xff0c;敢于创新和突破&#xff0c;自主搭建数字化班组管理平台&#xff0c;以进一步提升班组安全建设水平。 本文将详细介绍搭建设备…

腾讯云CDN的HTTPS服务是收费的?

腾讯云内容分发网络CDN的HTTPS请求数收费了&#xff0c;之前阿里云CDN是收费的&#xff0c;现在腾讯云HTTPS请求数也开始收费的&#xff0c;但是腾讯云还是很良心的每月300万次以内是不计费的&#xff0c;只有超出free额度的部分才另外收费&#xff0c;HTTPS请求数价格为0.05元…

Windows10资源管理器使用

文章目录 前言二、关联菜单操作1.分组展示2.添加选择复选框3.使用窗格模式4.功能区折叠二、“文件夹选项”对话框操作1.访问模式调整2.状态栏控制总结前言 目前Windows系统中的使用较多当属Windows10,资源管理器属于Windows系统中一个常用工具。本文总结了Windows 10 专业版下…

类的默认成员函数

为什么会有构造函数和析构函数呢&#xff1f; 1、初始化和销毁经常忘记 2、有些地方写起来很繁琐. Stack有了构造和析构&#xff0c;就不怕忘记写初始化和清理函数了&#xff0c;也简化了 例如在队列oj时&#xff0c;忘记释放&#xff0c;造成内存泄漏 构造函数 主要任务&am…

spark on k8s 部署的一点理解

Running Spark on Kubernetes - Spark 3.4.0 Documentation (apache.org) 前提条件 1. 本地有spark安装包&#xff0c;以便于执行 spark submit 命令 2. k8s 集群&#xff0c;以及本地的有 kubectl 并且配置的用户包含相关权限&#xff0c;具体可以参考官网 一些观念的转变 …

二百左右的蓝牙耳机哪款好?200左右音质最好的蓝牙耳机

在日常生活中离不开智能手机&#xff0c;特别是对无线蓝牙耳机的需求程度也越来越高&#xff0c;但是市面上有很多的蓝牙耳机戴久了耳朵会出现不舒服&#xff0c;为了获得更好的使用体验&#xff0c;我整理了市面上200左右价位佩戴和音质都表现不错的蓝牙耳机。 一、南卡小音舱…

Leetcode 并查集详解

在一些应用的问题中&#xff0c;需将n个不同的元素划分成一组不相交的集合。开始时&#xff0c;每个元素自成一格单元素集合&#xff0c;然后按一定顺序将属于同一组的元素的集合合并。其间要反复用到查询某个元素属于哪个集合的运算。适合于描述这类问题的抽象数据类型称为并查…

【22】linux进阶——文本处理工具:cut、awk、sed

大家好&#xff0c;这里是天亮之前ict&#xff0c;本人网络工程大三在读小学生&#xff0c;拥有锐捷的ie和红帽的ce认证。每天更新一个linux进阶的小知识&#xff0c;希望能提高自己的技术的同时&#xff0c;也可以帮助到大家 另外其它专栏请关注&#xff1a; 锐捷数通实验&…

一段凄惨Android 面试经历分享,败在了项目架构原理上……

大家应该看过很多分享面试成功的经验&#xff0c;但根据幸存者偏差的理论&#xff0c;也许多看看别人面试失败在哪里&#xff0c;对自己才更有帮助。 这是一位网友分享的面试经历&#xff0c;他准备了3个月&#xff0c;刚刚参加完字节跳动的第三面&#xff0c;视频面&#xff…

数说故事联合中山大学国际关系学院共建「国关数据实验室」,深化数据科学与国际关系融合创新

4月9日&#xff0c;数说故事联合中山大学国际关系学院共建的「国关数据实验室」正式启动&#xff0c;此次强强联合是双方在国际关系领域的一项创新尝试&#xff0c;该实验室旨在整合数说故事和国际关系学院师生的资源优势&#xff0c;将数据科学与国际关系研究相结合&#xff0…

真实还原美团4面经历,低学历成功拿到20K Offer...

个人背景 如标题所示&#xff0c;我的个人背景非常简单&#xff0c;软件测试经验 1 年半&#xff0c;学历普通&#xff0c;2 本毕业后出来就一直在做功能测试&#xff0c;在公司每天重复的工作对我的技术提升并没有什么帮助&#xff0c;但小镇出来的我也深知自我努力的重要性&…