深入理解网络 I/O:单 Group 混杂模式|多 Group 主从模式

news2024/9/22 11:33:45

在这里插入图片描述

🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任 Java 开发,CSDN 优质创作者
📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
🌲文章所在专栏:网络 I/O
🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
💬 向我询问任何您想要的东西,ID:vnjohn
🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
😄 代词: vnjohn
⚡ 有趣的事实:音乐、跑步、电影、游戏

目录

  • 前言
  • 单 Group 混杂模式
    • SelectorThread
    • SelectorThreadSingletonGroup
    • SelectorSingletonGroupMainThread
    • 测试单 Group 混杂
    • 小结
  • 多 Group 主从模式
    • SelectorThread
    • SelectorGroup
    • SelectorMultiGroupMainThread
    • 测试多 Group 主从
    • 小结
  • IO Threads
  • 总结

前言

在之前的文章中,从阻塞 I/O:BIO、非阻塞 I/O:NIO、多路复用 select/poll、多路复用 epoll

重要的 I/O 模型也是现在市场上大部分中间件运用的模型也就是基于 I/O 多路复用:epoll,比如:Redis、RocketMQ、Nginx 等,这些地方都运用了 epoll,只不过在 RocketMQ 的实现采用了 Netty,而 Netty 也基于 epoll 这套多路复用模型进行实现的,上篇文章详细介绍了单 Selector 多线程模型、单 Selector 单线程模型,在此文章会才是真正意义上 Netty 的变相实现,看它是如何一步步从单 Selector 非线性模型 —> 单 Selector 线性模型 —> 单 Selector Group 混杂模式 —> 多 Selector Group 主从模式一步步演练过来的,本篇博文主要围绕单 Group 混杂模式 —> 多 Group 主从模式进行具体的展开.

Group 组的含义,它可以是充当 Boss 组角色也可以是充当 Worker 组角色
Boss:负责接收服务端 channel 以及客户端连接的 channel
Worker:负责的是 R/W 工作,由于读写操作较于频繁,它一般在分配资源上会多于优于 Boss

单 Group 混杂模式

单个 Group 混杂模式,代表的就是一个 Group 中,它既可以是 Boss 也可以 Worker,而在单个 Group 中,它就是混杂的存在,既可以做 Boss 的工作,也可以做 Worker 的工作

单 Group 中存在多个 Selector

其实在单 Group 混杂模式中,它的代码与单 Selector 单线程模型区别不大,只是在单 Group 采用了多线程的方式来进行了处理,充分利用了多核 CPU 的资源,而在单 Group 我只负责让其中一个线程进行服务端 channel,而其他的线程负责处理来自客户端 channel 的 accept 工作和客户端的读与服务端的写工作处理,接下来就看代码吧!!!

SelectorThread

package org.vnjohn.group.singleton;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @author vnjohn
 * @since 2023/12/15
 */
public class SelectorThread implements Runnable {

    Selector selector = null;

    LinkedBlockingQueue<Channel> lbq = new LinkedBlockingQueue<>();

    SelectorThreadSingletonGroup selectorThreadGroup = null;

    public SelectorThread(SelectorThreadSingletonGroup selectorThreadGroup) {
        try {
            this.selectorThreadGroup = selectorThreadGroup;
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        // loop
        while (true) {
            try {
                // 1.select:如果一直没有 fd,该方法会阻塞,一直没有返回,通过调用 wakeup() 唤醒
                System.out.println(Thread.currentThread().getName() + "   :   before select ......" + selector.keys().size());
                int num = selector.select();
                System.out.println(Thread.currentThread().getName() + "   :   after select ......" + selector.keys().size());
                // 2.处理 selectKeys
                if (num > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        // 每一个 fd 是线性处理的过程
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isAcceptable()) {
                            // 接受客户端的过程
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            readHandler(key);
                        } else if (key.isWritable()) {

                        }
                    }
                }

                // 3.处理 queue runTask,队列是堆里的对象,线程的栈是独立的,堆是共享的,只有方法的逻辑,本地变量是线程隔离的
                if (!lbq.isEmpty()) {
                    Channel channel = lbq.take();
                    // accept 使用的是 ServerSocketChannel
                    if (channel instanceof ServerSocketChannel) {
                        ServerSocketChannel server = (ServerSocketChannel) channel;
                        server.register(selector, SelectionKey.OP_ACCEPT);
                        System.out.println(Thread.currentThread().getName() + " register server");
                        // read / write 使用的是 SocketChannel
                    } else if (channel instanceof SocketChannel) {
                        SocketChannel client = (SocketChannel) channel;
                        ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
                        client.register(selector, SelectionKey.OP_READ, buffer);
                        System.out.println(Thread.currentThread().getName() + " register client:" + client.getRemoteAddress());
                    }
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void readHandler(SelectionKey key) {
        System.out.println(Thread.currentThread().getName() + "  readHandler.......");
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        SocketChannel client = (SocketChannel) key.channel();
        buffer.clear();
        while (true) {
            try {
                int num = client.read(buffer);
                if (num > 0) {
                    // 将读到的内容翻转,然后直接写出
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (num == 0) {
                    break;
                } else {
                    // 有可能客户端断开了-异常情况
                    System.out.println("client:" + client.getRemoteAddress() + "      closed....");
                    key.cancel();
                    client.close();
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void acceptHandler(SelectionKey key) {
        System.out.println(Thread.currentThread().getName() + "  acceptHandler.......");
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        try {
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            // choose a selector and register !!
            selectorThreadGroup.nextSelectorV2(client);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Selector 线程仍然是做这么几件事情,Selector#select 阻塞调用等待事件的到来,当有新的客户端连接时,通过 SelectorThreadGroup#nextSelectorV2 去选择对应的 accept 线程进行处理往阻塞队列中塞入元素;当有事件需要读则处理读事件具体的业务方法,模拟业务场景,将接收到的数据再回写给客户端

在阻塞队列等待事件到来时,通过 channel 来区分是服务端的 channel 还是客户端的 channel,服务端的 channel 则注册一个 accept 事件,如果是客户端 channel 那么就将它注册一个 accept 后再注册 read 事件后返回.

在这里并没有区分事件的类型 accept、R/W,也没有区分客户端和服务端的事件怎么处理

SelectorThreadSingletonGroup

package org.vnjohn.group.singleton;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author vnjohn
 * @since 2023/12/16
 */
public class SelectorThreadSingletonGroup {
    SelectorThread[] selectorThreads;

    ServerSocketChannel server = null;

    AtomicInteger xid = new AtomicInteger(0);

    public SelectorThreadSingletonGroup(int num) {
        // num 是线程数
        selectorThreads = new SelectorThread[num];
        // 启动多个线程,一个线程对应一个 selector
        for (int i = 0; i < selectorThreads.length; i++) {
            selectorThreads[i] = new SelectorThread(this);
            new Thread(selectorThreads[i], "SelectorSingletonGroupThread-" + i).start();
        }
    }

    public void bind(int port) {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));
            // server 注册到哪一个 selector?
            nextSelectorV2(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 单 group
     * Boss 服务端 ServerSocket 写死走的是 SelectorSingletonGroupThread-0 线程
     * 客户端 SocketClient 走 SelectorSingletonGroupThread-X 其他线程
     */
    public void nextSelectorV2(Channel c) {
        // 在主线程中,取到堆里的 SelectorThread 对象
        try {
            // 1.通过队列传递数据、消息
            // 2.通过 wakeup 打断阻塞,让对应的线程在打断后去自己完成注册 selector
            if (c instanceof ServerSocketChannel) {
                selectorThreads[0].lbq.put(c);
                selectorThreads[0].selector.wakeup();
            } else {
                SelectorThread nextSelectorThread = nextV2();
                nextSelectorThread.lbq.put(c);
                nextSelectorThread.selector.wakeup();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * Server 固定在 SelectorSingletonGroupThread-0 身上
     * Client 分配到其他 SelectorSingletonGroupThread-X 身上
     * 比如:0%3 = 0+1、1%3 = 1+1、2%3 = 2+1
     *
     * @return 返回的是分配要处理的线程
     */
    private SelectorThread nextV2() {
        // 单个 group 多线程时,会进行轮询处理,有可能也会导致倾斜
        int index = xid.incrementAndGet() % (selectorThreads.length - 1);
        return selectorThreads[index + 1];
    }

}

对比单 Selector 单线程模式,在选择 Selector 时做了一下区分,当属于服务端 channel 时,以写死的方式让第一个线程来进行处理,自然而然它会走到属于第一个线程的 Selector 进行处理;若是客户端 channel,让其他的线程去进行处理,无论是客户端的 accept 还是 R/W 事件

在这里能够看出问题,第一个线程资源一直浪费在哪里,它只做了服务端 channel 的 accept 工作,而其他的工作都交给了客户端去进行处理了,而客户端处理虽然有多个线程,通过 nextV2 方法分配的方式必然会造成资源的倾斜,可能有的客户端处理的事件过多,有的客户端处理的事件过少,这也就是为什么会提出负载均衡这个概念了,轮询、权重?

SelectorSingletonGroupMainThread

package org.vnjohn.group.singleton;

/**
 * @author vnjohn
 * @since 2023/12/16
 */
public class SelectorSingletonGroupMainThread {
    public static void main(String[] args) {
        // 1、创建 IO Thread(一个或多个)
        // 单个 group 混杂模式:即当 BOSS 又当 worker
        // 混杂模式:只有一个线程负责 accept,每个线程都会被分配 client 进行 R/W(包括负责客户端 accept)
        SelectorThreadSingletonGroup singletonGroup = new SelectorThreadSingletonGroup(3);

        // 2、应该把监听(9999)的 server 注册到某一个 selector 上
        singletonGroup.bind(9999);
    }
}

分配一个 Group 组,该组分配三个线程,一个线程负责服务单 accept,其他线程负责客户端连接以及客户端的读写,既充当了 Boss 工作也充当了 Worker 工作

测试单 Group 混杂

1、启动主线程 Main 方法,控制台输出内容如下:

SelectorSingletonGroupThread-1   :   before select ......0
SelectorSingletonGroupThread-0   :   before select ......0
SelectorSingletonGroupThread-2   :   before select ......0
SelectorSingletonGroupThread-0   :   after select ......0
SelectorSingletonGroupThread-0 register server
SelectorSingletonGroupThread-0   :   before select ......1

一个 Group 中三个 Selector 都启动成功,由第一个线程负责接收服务端 channel accept 事件

2、nc localhost 9999 模拟客户端来连接服务端进行读、写操作,它会由其中一个线程(并非第一个线程)进行 accept、R/W,但是客户端的 accept 会交给服务端所在的 SelectorSingletonGroupThread-0 去进行建立绑定 TCP 关系

SelectorSingletonGroupThread-0   :   after select ......1
SelectorSingletonGroupThread-0  acceptHandler.......
SelectorSingletonGroupThread-0   :   before select ......1
SelectorSingletonGroupThread-2   :   after select ......0
SelectorSingletonGroupThread-2 register client:/0:0:0:0:0:0:0:1:55759
SelectorSingletonGroupThread-2   :   before select ......1

3、当我在客户端触发写数据的操作时,它会由 SelectorSingletonGroupThread-2 去进行读、写操作

SelectorSingletonGroupThread-2   :   after select ......1
SelectorSingletonGroupThread-2  readHandler.......
SelectorSingletonGroupThread-2   :   before select ......1

从以上返回的内容来看,SelectorSingletonGroupThread-0 线程它只负责接收服务端的 accept 以及与客户端之间建立的 TCP 关系,而其他的线程负责接收客户端的 accept 以及客户端的 R/W 操作

小结

由单 Group 混杂模式来看,虽然使用了多个 Selector 多线程来利用好多核多 CPU 资源,但从实际运行的角度来看,它不是最优解,它仍然会有一些资源倾斜以及浪费问题

1、比如:SelectorSingletonGroupThread-0,它只是接收了服务端 channel accept 以及建立好 TCP 关系,它就不做任何的操作了,这部分工作对于网络而言只是一小部分的工作,大部分的工作基本上都是围绕在客户端与服务单之间发生的 R/W 操作的,从这点来看,SelectorSingletonGroupThread-0 它并完全的能够利用好这个线程能做的事情

2、其他的 Selector 线程,通过 nextV2 方法来分配某个线程来进行处理客户端 accept、R/W 事件,在多个客户端同时进来时,肯定会造成资源倾斜的问题,有的线程很忙碌,有的线程停滞不前

基于设计理念来看,需要将业务进行解耦,比如说读写分离,而在这里应该考虑的是 accept 工作,无论是服务端 channel 还是客户端 channel 应该都交由给一个线程去进行处理,而其他线程只是专注于处理客户端的读写事件,这样的好处在于,能够让我们更加理解,分水岭分界开各自要做的事情

在下面要介绍的就是「服务端 channel、客户端 channel」交由给一个 Group 去做:Boss 组
客户端与服务端之间的读写工作交给其他 Group 去做:Worker 组

多 Group 主从模式

在这里插入图片描述

如上图是 Netty 中工作的架构图,类比下面要介绍多 Group 主从模式

Boss 组:主要是用于接收客户端连接进来,然后分配到 workerGroup 线程,交由 Worker 组中的线程去 accept 注册到自己的 selector 中,后续该客户端的 fd 都交由分配到的 worker 线程去线性处理,而 Boss 只是负责与客户端连接,但不负责对客户端进行 event 处理,event 交由 worker 线程去处理.

SelectorThread

package org.vnjohn.group.multi;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;

/**
 1. @author vnjohn
 2. @since 2023/12/16
 */
public class SelectorThread extends ThreadLocal<LinkedBlockingQueue<Channel>> implements Runnable {
    /**
     * 每线程对应一个 selector,多线程情况下,该主机,该程序并发进来客户端会被分配到多个 selector 上
     * 注意:每个客户端只绑定到其中一个 selector,其实不会有交互问题:每个客户端都是线性执行的
     */
    Selector selector = null;

    /**
     * 每个线程持有自己独立的 LinkedBlockingQueue 对象
     */
    LinkedBlockingQueue<Channel> linkedBlockingQueue = get();

    SelectorThreadGroup selectorThreadGroup = null;

    @Override
    protected LinkedBlockingQueue<Channel> initialValue() {
        return new LinkedBlockingQueue<>();
    }

    public SelectorThread(SelectorThreadGroup selectorThreadGroup) {
        try {
            this.selectorThreadGroup = selectorThreadGroup;
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        // loop
        while (true) {
            try {
                // 1.select:如果一直没有 fd,该方法会阻塞,一直没有返回,通过调用 wakeup() 唤醒
                int num = selector.select();
                // 2.处理 selectKeys
                if (num > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    // // 每一个 fd 是线性处理的过程
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isAcceptable()) {
                            // 复杂:接受客户端的过程(接收之后,要注册,新的客户端注册到那个 selector 上呢?)
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            readHandler(key);
                        } else if (key.isWritable()) {

                        }
                    }
                }

                // 3.处理 queue runTask,队列是堆里的对象,线程的栈是独立的,堆是共享的
                if (!linkedBlockingQueue.isEmpty()) {
                    Channel c = linkedBlockingQueue.take();
                    // accept 使用的是 ServerSocketChannel
                    if (c instanceof ServerSocketChannel) {
                        ServerSocketChannel server = (ServerSocketChannel) c;
                        server.register(selector, SelectionKey.OP_ACCEPT);
                        System.out.println(Thread.currentThread().getName() + " register server " + server.getLocalAddress());
                    // read、write 使用的是 SocketChannel
                    } else if (c instanceof SocketChannel) {
                        SocketChannel client = (SocketChannel) c;
                        ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
                        client.register(selector, SelectionKey.OP_READ, buffer);
                        System.out.println(Thread.currentThread().getName() + " register client:" + client.getRemoteAddress());
                    }
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void readHandler(SelectionKey key) {
        System.out.println(Thread.currentThread().getName() + "  readHandler.......");
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        SocketChannel client = (SocketChannel) key.channel();
        buffer.clear();
        while (true) {
            try {
                int num = client.read(buffer);
                if (num > 0) {
                    // 将读到的内容翻转,然后直接写出
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (num == 0) {
                    break;
                } else {
                    // 有可能客户端断开了-异常情况
                    System.out.println("client:" + client.getRemoteAddress() + "      closed....");
                    key.cancel();
                    client.close();
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void acceptHandler(SelectionKey key) {
        System.out.println(Thread.currentThread().getName() + "  acceptHandler.......");
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        try {
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            // 此时的 stg 已经是 worker 组了,然后让 worker 组去分发客户端的 accept
            selectorThreadGroup.nextSelectorV3(client);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 此时如果是 boss 组调用的话,会将当前类的 SelectorThreadGroup 设置为 workerGroup 来执行操作
     *
     * @param stgWorker
     */
    public void setWorker(SelectorThreadGroup stgWorker) {
        this.selectorThreadGroup = stgWorker;
    }
}

在 SelectorThread 中,本地线程独有 LinkedBlockingQueue<Channel>,线程在执行时分为以下三步:

1、Selector#select:阻塞直到拿到有状态的 FDS

2、 若 FDS 数量返回大于 0,接下来就是处理 SelectKeys 里面每个 SelectKey 对象

若当前 SelectKey 状态为 accept,就交由给 WorkerGroup 里的线程去注册
若当前 SelectKey 状态为 read,说明当前的 FD 已经在 WorkerGroup 里的线程了,直接线性处理与该客户端的数据交互即可

通过 SelectKey#attachment 方法接收客户端发送过来的数据,通过 SelectKey#channel 拿到客户端信息
通过客户端 channel > SocketChannel#read 方法读取来自客户端的数据,通过 SocketChannel#write 方法写回到客户端中

3、处理 LinkedBlockingQueue 队列中的元素,也就是 Task,队列是属于堆里的对象,而线程栈是独享的,堆是共享的,再去队列中取出对应的 channel

若 channel 为 ServerSocketChannel,则给它注册到 BossGroup 中,给它绑定上 Accept Event
若 channel 为 SocketChannel,则给它注册到 WorkerGroup 中,给它绑定上 Read Event

SelectorGroup

package org.vnjohn.group.multi;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author vnjohn
 * @since 2023/12/16
 */
public class SelectorThreadGroup {

    SelectorThread[] selectorThreads;

    ServerSocketChannel server = null;

    AtomicInteger xid = new AtomicInteger(0);

    //
    // 如果
    /**
     * 若 workerThreadGroup 是当前 group,说明就是 worker 组
     * 若不是当前 group,说明是 BOSS 下的 worker 组,Boss 要持有对 Worker 组的引用
     */
    SelectorThreadGroup workerThreadGroup = this;

    public void setWorker(SelectorThreadGroup selectorThreadGroup) {
        this.workerThreadGroup = selectorThreadGroup;
    }

    public SelectorThreadGroup(int num, Boolean bossGroup) {
        // num 是线程数
        selectorThreads = new SelectorThread[num];
        // 启动多个线程,一个线程对应一个 selector
        for (int i = 0; i < selectorThreads.length; i++) {
            selectorThreads[i] = new SelectorThread(this);
            new Thread(selectorThreads[i], bossGroup ? "SelectorBossGroupThread-" + i : "SelectorWorkerGroupThread-" + i).start();
        }
    }

    /**
     * 绑定一个服务端-端口
     *
     * @param port
     */
    public void bind(int port) {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));
            // server 注册到哪一个 Boss 组的 selector?
            nextSelectorV3(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * boss-worker 多组
     *
     * @param c
     */
    public void nextSelectorV3(Channel c) {
        try {
            if (c instanceof ServerSocketChannel) {
                SelectorThread selectorThread = next();
                selectorThread.linkedBlockingQueue.put(c);
                selectorThread.setWorker(workerThreadGroup);
                selectorThread.selector.wakeup();
            } else {
                SelectorThread selectorThread = nextV3();
                // 1.通过队列传递数据、消息
                selectorThread.linkedBlockingQueue.put(c);
                // 2.通过打断阻塞,让对应的线程在打断后去自己完成注册 selector
                selectorThread.selector.wakeup();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private SelectorThread next() {
        // 单个 group 多线程时,会进行轮询处理,有可能也会导致倾斜
        int index = xid.incrementAndGet() % selectorThreads.length;
        return selectorThreads[index];
    }

    /**
     * 取出 Boss 中对应的 workerGroup 线程
     *
     * @return
     */
    private SelectorThread nextV3() {
        int index = xid.incrementAndGet() % workerThreadGroup.selectorThreads.length;
        return workerThreadGroup.selectorThreads[index];
    }
}

SelectorGroup#next 方法是取出 Boss Group 中的线程,SelectorGroup#nextV3 方法是取出 Worker Group 中的线程,若对应的组进行了多次调用,两种方式都是通过轮询分配工作给线程的匹配规则

BoosGroup、WorkerGroup 在处理的过程有不一样区别

若是 ServerSocket 则分配到当前 Boss Group 中的 BlockingQueue 中,若客户端连接进来了,通过 Boss 分配给到 WorkerGroup 组其中一个线程去处理客户端操作 event

SO,BossGroup 要持有 WorkerGroup 引用,才能在客户端到来时,Boss 能够通过 WorkerGroup 对象去分配一个线程处理这个客户端的操作

SelectorMultiGroupMainThread

package org.vnjohn.group.multi;

/**
 * @author vnjohn
 * @since 2023/12/16
 */
public class SelectorMultiGroupMainThread {
    public static void main(String[] args) {
        // Boss 有自己的线程组
        SelectorThreadGroup boss = new SelectorThreadGroup(3, Boolean.TRUE);
        // worker 有自己的线程组
        SelectorThreadGroup worker = new SelectorThreadGroup(3, Boolean.FALSE);
        boss.setWorker(worker);

        boss.bind(6666);
        boss.bind(7777);
        boss.bind(8888);
    }
}

BossGroup 每分配一个线程,都需要去进行 accept,触发服务端的 bind 操作,然后这个被选中的 Boss 线程必须持有对 WorkerGroup 引用

测试多 Group 主从

1、启动主线程 main 方法,控制台输出内容,如下:

SelectorBossGroupThread-0 register server /0:0:0:0:0:0:0:0:8888
SelectorBossGroupThread-2 register server /0:0:0:0:0:0:0:0:7777
SelectorBossGroupThread-1 register server /0:0:0:0:0:0:0:0:6666

每个服务端 channel 都注册到对应的 Boss Selector 线程

2、nc localhost 8888、nc localhost 6666、nc localhost 7777 模拟客户端连接,如此,每个客户端的连接都会交由给对应端口所在 Boss 线程进行 accept,并会轮询分配给到 WorkerGroup 中的线程中去,控制台输出内容如下:

SelectorBossGroupThread-0  acceptHandler.......
SelectorWorkerGroupThread-1 register client:/0:0:0:0:0:0:0:1:58175
SelectorBossGroupThread-1  acceptHandler.......
SelectorWorkerGroupThread-2 register client:/0:0:0:0:0:0:0:1:58288
SelectorBossGroupThread-2  acceptHandler.......
SelectorWorkerGroupThread-0 register client:/0:0:0:0:0:0:0:1:58397

BossGroupThread-0:8888 > WorkerGroupThread-1
BossGroupThread-1:6666 > WorkerGroupThread-2
BossGroupThread-2:7777 > WorkerGroupThread-0

小结

BossGroup 负责接收客户端,由它轮询分配 WorkerGroup 线程先 accept,然后交由给 WorkerGroup 线程去处理客户端的 R/W 操作!

核心:BossGroup 持有 WorkerGroup 中引用,各个线程持有对应 TaskQueue,Boss 处理 accept 以及分配客户端的工作,Worker 处理客户端的读写 R/W 工作

IO Threads

io threads 是为了更好发挥硬件以及 CPU 多核的处理能力,在对客户端有状态的 FD 进行 R/W 操作时,拿到数据以后,防止对当前的 FD 读取/写入时一直阻塞,将其他 FD 交由给其他业务线程去处理,io threads 就是为了解决 IO R/W 问题而存在的,也就是提高性能而存在的一种机制.

总结

该篇博文主要介绍多路复用模型 Epoll 下「单 Group 混杂模式与多 Group 主从模式」之间的区别,先是说明了在单 Group 混杂模式中由于 Event 未划分清晰造成资源倾斜问题,后者介绍了多 Group 主从模式,解决资源倾斜存在的问题,结合 BossGroup + WorkerGroup + 链接阻塞队列的方式来完成,Netty Reactor 它的工作架构图类比于此模式,只是它在此基础上做了很多的优化工作,也就是为什么大多数中间价会使用 Netty 原因,最重要的就是为了充分发挥我们硬件以及多核 CPU 资源,希望您能够喜欢,感谢三连支持!

参考文献:

  1. 《UNIX网络编程 卷1:套接字联网API(第3版)》— [美] W. Richard Stevens Bill Fenner Andrew M. Rudoff

学习帮助文档:

  • man pages:yum install man
  • pthread man pages:yum -y install man-pages

🌟🌟🌟愿你我都能够在寒冬中相互取暖,互相成长,只有不断积累、沉淀自己,后面有机会自然能破冰而行!

博文放在 网络 I/O 专栏里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1315968.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DS考研真题总结——客观题(1)

开始整理真题中的客观小题&#xff0c;至于和算法有关的大题统一最后整理~ 定义背诵&#xff1a;数据结构是计算机存储、组织数据的方式。数据结构是指相互之间存在一种或多种特定关系的数据元素的集合。通常情况下&#xff0c;精心选择的数据结构可以带来更高的运行或者存储效…

现代雷达车载应用——第2章 汽车雷达系统原理 2.2节 汽车雷达架构

经典著作&#xff0c;值得一读&#xff0c;英文原版下载链接【免费】ModernRadarforAutomotiveApplications资源-CSDN文库。 2.2 汽车雷达架构 从顶层来看&#xff0c;基本的汽车雷达由发射器&#xff0c;接收器和天线组成。图2.2给出了一种简化的单通道连续波雷达结构[2]。这…

javaEE -17(13000字 CSS3 入门级教程)

一&#xff1a;CSS3 简介 CSS3 是 CSS2 的升级版本&#xff0c;它在 CSS2 的基础上&#xff0c;新增了很多强大的新功能&#xff0c;从而解决一些实际面临的问题&#xff0c;CSS3 在未来会按照模块化的方式去发展&#xff1a;https://www.w3.org/Style/CSS/current-work.html …

Python基础01-环境搭建与输入输出

零、文章目录 Python基础01-环境搭建与输入输出 1、Python概述 &#xff08;1&#xff09;为什么要学习Python 技术趋势&#xff1a;Python自带明星属性&#xff0c;热度稳居编程语言界前三 简单易学&#xff1a;开发代码少&#xff0c;精确表达需求逻辑&#xff1b;33个关…

linux记录

一、修改eth0网口静态IP 1、sudo vim /etc/network/interfaces 2、按E进入编辑模式&#xff0c;按i进入编辑输入&#xff1a; auto eth0 iface eth0 inet static address 192.168.3.223 netmask 255.255.255.0 gateway 192.168.3.13、按Esc退出编辑&#xff1b;:wq保存文件并…

移植LVGL到像素屏,从此玩转像素屏0门槛

硬件方面 先上渲染图 实物图 配置 主控&#xff1a;esp32 micro32 plus主频&#xff1a;240MhzFlash&#xff1a;8MPSRAM&#xff1a;2M 软件方面 众所周知&#xff0c;LVGL是一个十分优秀的图形框架&#xff0c;小到几百kb的单片机&#xff0c;大到Linux都可以运行。既然它…

【Qt QML 入门】TextEdit

TextEdit可以显示多行可编辑的格式化文。默认是无边框的&#xff0c;可以和父控件完美融合。 import QtQuick import QtQuick.Window import QtQuick.ControlsWindow {id: winwidth: 800height: 600visible: trueTextEdit {id: textEditanchors.centerIn: parenttext: "He…

【ArkTS】生命周期

页面生命周期 通常Entry修饰的组件称为页面&#xff0c;其拥有页面生命周期 onPageShow&#xff1a;页面每次显示时触发。onPageHide&#xff1a;页面每次隐藏时触发&#xff08;通常是路由跳转到其他页面了&#xff09;。onBackPress&#xff1a;当用户点击返回按钮时时触发…

谷歌的开源供应链安全

本内容是对Go项目负责人Russ Cox 在 ACM SCORED 活动上演讲内容[1]的摘录与整理。 SCORED 是Software Supply Chain Offensive Research and Ecosystem Defenses的简称, SCORED 23[2]于2023年11月30日在丹麦哥本哈根及远程参会形式举行。 摘要 &#x1f4a1; 谷歌在开源软件供应…

​SQL (关系型) 数据库-fastapi集成

SQL (关系型) 数据库 - FastAPI FastAPI不需要你使用SQL(关系型)数据库。 但是您可以使用任何您想要的关系型数据库。 在这里&#xff0c;让我们看一个使用着SQLAlchemy的示例。 您可以很容易地将SQLAlchemy支持任何数据库&#xff0c;像&#xff1a; PostgreSQLMySQLSQLi…

Visual Studio 2022封装C代码为x64和x86平台动态库

1.引言 本文介绍如何使用Visual Studio 2022将C语言函数封装成x64和x86平台上使用的动态链接库(dll文件)并生成对应的静态链接库(lib文件)&#xff0c;以及如何在C程序中调用生成的dll。 程序下载&#xff1a; 2.示例C语言程序 假设需要开发一个动态链接库&#xff0c;实现复…

Linux(操作系统)面经——part 1(持续更新中......)

1、说一说常用的 Linux 命令 mkdir创建文件夹&#xff0c;touch创建文件&#xff0c;mv移动文件内容或改名 rm-r 文件名&#xff1a;删除文件 cp拷贝&#xff1a;cp 文件1 文件2&#xff0c;cp-r跨目录拷贝 cp-r 路径1 路径2 vi 插入 &#xff1a;wqb保存退出 :q!强制退出…

W25N01GV 芯片应用

项目中处于成本考虑&#xff0c;要把Nor Flash换成低成本的Nand Flash。 这里总结下芯片应用。 总体概述&#xff1a; 1&#xff09;W25N01&#xff08;NandFlash&#xff09;和W25Q&#xff08;Nor Flash&#xff09;的操作大不一样。 NandFlash擦除以块&#xff08;128KB&…

计算BMI指数-第11届蓝桥杯选拔赛Python真题精选

[导读]&#xff1a;超平老师的Scratch蓝桥杯真题解读系列在推出之后&#xff0c;受到了广大老师和家长的好评&#xff0c;非常感谢各位的认可和厚爱。作为回馈&#xff0c;超平老师计划推出《Python蓝桥杯真题解析100讲》&#xff0c;这是解读系列的第19讲。 计算BMI指数&…

【TB作品】基于单片机的机械通风控制系统,实时温度和二氧化碳浓度

硬件&#xff1a; &#xff08;1&#xff09;51系列单片机&#xff0c;拟采用STC89C52RC&#xff1b; &#xff08;2&#xff09;DS18B20温度传感器&#xff1b; &#xff08;3&#xff09;二氧化碳浓度传感器&#xff1a;https://item.taobao.com/item.htm?spma21n57.1.0.0.1…

DS八大排序之冒泡排序和快速排序

前言 前两期我们已经对"插入排序"&#xff08;直接插入排序和希尔排序&#xff09; 和 "选择排序"&#xff08;直接选择排序和堆排序&#xff09;进行了详细的介绍~&#xff01;这一期我们再来详细介绍一组排序 &#xff1a;"交换排序"即耳熟能…

lv12 uboot移植深化 9

u-boot-2013.01移植 【实验目的】 了解u-boot 的代码结构及移植的基本方法 【实验环境】 ubuntu 14.04发行版FS4412实验平台交叉编译工具arm-none-linux-gnueabi- 【注意事项】 实验步骤中以“$”开头的命令表示在 ubuntu 环境下执行 【实验步骤】 1 建立自己的平台 1.…

在线客服系统定价因素解析:影响价格的关键因素

跨境电子商务公司必不可少的工具就是在线客服系统。企业选择在线客服系统的时候免不了要对不同产品的功能性、价格、服务等因素进行考量。今天这篇文章&#xff0c;我们就来探讨一下在线客服系统的定价因素有哪些&#xff1f;探究市面上的在线客服系统价格各异的影响因素。为大…

libp2p 快速开始

文章目录 第一部分&#xff1a;libp2p 快速入门一、什么是libp2plibp2p 发展历程libp2p的特性p2p 网络和我们熟悉的 client/server 网络的区别&#xff1a; 二、Libp2p的实现目标三、Libp2p的用途四、运行 Libp2p 协议流程libp2p 分为三层libp2p 还有一个局域网节点发现协议 mD…

27系列DGUS智能屏发布:可实时播放高清模拟信号摄像头视频

针对高清晰度的模拟信号摄像头视频画面的显示需求&#xff0c;迪文特推出27系列DGUS智能屏。该系列智能屏可适配常见的AHD摄像头、CVBS摄像头&#xff0c;支持单路1080P高清显示、两路720P同屏显示&#xff08;同一类型摄像头&#xff09;。用户通过DGUS简单开发即可实现摄像头…