Spring架构篇--2.7.1 远程通信基础--Netty原理--NioEventLoopGroup

news2024/11/23 8:22:28

前言:在使用Netty 时不管是服务端还是客户端都需要 new NioEventLoopGroup 对象进行工作,NioEventLoopGroup的作用是什么呢;

1 NioEventLoopGroup 类图:
在这里插入图片描述

从类名字来看它是一个Nio 流的事件轮询器组,既然是一组顾名思义这个组里应该存放了同一种类型的字对象;从类图看它是继承Executor 它是Java 线程池的顶级类,说明它是具有创建线程执行线程的能力;

NioEventLoopGroup 对于任务的处理:

package com.example.nettydemo.netty.task;

import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@Slf4j
public class EventLoopTask {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 可以处理 io 事件,普通任务,定时任务
        EventLoopGroup group = new NioEventLoopGroup(2);
        // 可以处理,普通任务,定时任务
//        EventLoopGroup defaultGroup = new DefaultEventLoopGroup();
        Future future = group.next().submit(() -> {
            log.debug("普通任务");
            return "success";
        });
        // 异步结果获取 nio 获取结果
        future.addListener(new FutureListener<String>() {
            @Override
            public void operationComplete(Future<String> stringFuture) throws Exception {
                log.debug("结果:{}", stringFuture.get());
            }
        });
        // 同步调用 main 线程获取结果
        log.debug("结果:{}", future.get());

        // 定时任务 延迟2s 之后 以1s 的频率执行任务
        group.next().scheduleAtFixedRate(() -> {
            log.debug("定时任务");
        }, 2, 1, TimeUnit.SECONDS);

//        group.shutdownGracefully();

    }
}

代码比价简单直接看下执行结果:在这里插入图片描述

2 NioEventLoopGroup的初始化 :

2.1 当创建 NioEventLoopGroup 时 会看到其嵌了很多层的构造方法:
NioEventLoopGroup 类:

public NioEventLoopGroup() {
	// 核心的线程数量此时给的时0
  this(0);
}

public NioEventLoopGroup(int nThreads) {
	// 线程的执行器Executor 给的是null
    this(nThreads, (Executor)null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
	// SelectorProvider 根据当前的环境获得 SelectorProvider的单例
	// 如果是window 环境 最终会给到WindowsSelectorProvider 对象
    this(nThreads, threadFactory, SelectorProvider.provider());
}

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
	// DefaultSelectStrategyFactory.INSTANCE 默认的选择策略
    this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, Executor executor, SelectorProvider selectorProvider, SelectStrategyFactory selectStrategyFactory) {
// RejectedExecutionHandlers.reject() 拒绝的处理器,直接抛出了异常
        super(nThreads, executor, new Object[]{selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()});
    }

SelectorProvider.provider()可以参考:window–Select.open()

选择策略类 DefaultSelectStrategyFactory.INSTANCE:

public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
    public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();

    private DefaultSelectStrategyFactory() {
    }

    public SelectStrategy newSelectStrategy() {
        return DefaultSelectStrategy.INSTANCE;
    }
}

拒绝策略类 RejectedExecutionHandlers:

private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
    public void rejected(Runnable task, SingleThreadEventExecutor executor) {
        throw new RejectedExecutionException();
    }
};

此时我们可以看到在构造 NioEventLoopGroup所用到的几个参数:

  • super(nThreads, executor, new Object[]{selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()}
  • nThreads: 处理任务的核心线程数 此时为 0
  • executor: 线程的执行器 此时为 null
  • selectorProvider:Selector的提供者 此时为 WindowsSelectorProvider
  • selectStrategyFactory: 默认的选择器策略,此时为 DefaultSelectStrategyFactory 对象
  • RejectedExecutionHandlers.reject():拒绝的策略直接抛出了RejectedExecutionException

执行到此这里有个关键代码:SelectorProvider.provider(),可以看到此时已经获取到了selector 对象并当做参数向下传递:

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
	// SelectorProvider 根据当前的环境获得 SelectorProvider的单例
	// 如果是window 环境 最终会给到WindowsSelectorProvider 对象
    this(nThreads, threadFactory, SelectorProvider.provider());
}

最后super 的方法会 进入其父类MultithreadEventLoopGroup 继续完成初始化:

2.2 MultithreadEventLoopGroup 类的初始化:

// 核心线程数的获取,可以理解为当前电脑cpu的核心数量*2;
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
 	// 继续进入父类方法,如果核心数为0,则取DEFAULT_EVENT_LOOP_THREADS ;
 	// 如果自己在new NioEventLoopGroup(核心线程数),传入了则使用自定义的核心线程数
     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
 }

这里看到一个关键点 NioEventLoopGroup 中线程数量的定义: 核心线程数的获取,可以理解为当前电脑cpu的核心数量*2;

private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2))

2.3 MultithreadEventExecutorGroup类的初始化:

// 分组内的 多个事件执行器
 private final EventExecutor[] children;
 private final Set<EventExecutor> readonlyChildren;
 private final AtomicInteger terminatedChildren;
 private final Promise<?> terminationFuture;
 // 事件执行器 选择工厂
 private final EventExecutorChooserFactory.EventExecutorChooser chooser;
 
 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
	// 已经完成工作的个数初始为0
   this.terminatedChildren = new AtomicInteger();
   // 异步线程任务的执行(任务队列的单线程事件执行器)
   this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
   if (nThreads <= 0) {
       throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
   } else {
       if (executor == null) {
       		// 初始化 线程工厂类
       		// (对要创建的线程,优先级,是否守护线程,线程前准,线程分组 赋值)
           executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
       }
		// NioEventLoopGroup 子事件处理个数 数组初始化
       this.children = new EventExecutor[nThreads];

       int j;
       // 创建 nThreads 个数的 EventExecutor 事件执行器
       for(int i = 0; i < nThreads; ++i) {
           boolean success = false;
           boolean var18 = false;

           try {
               var18 = true;
               // 子事件 初始化 
               // 每个子事件都放入了NioEventLoopGroup 父类对象
               // 每个子事件都有构建线程并且执行任务的能力
               // 每个子事件都有integer 最大值的任务队列
               // 每个子事件 都初始化了一个Selector 并且将Selector 中已经准备好的
               // select 从set 结构优化到了数组结构
               this.children[i] = this.newChild((Executor)executor, args);
               success = true;
               var18 = false;
           } catch (Exception var19) {
           		// 子事件初始化异常 抛出异常 并在finally 中将已经创建的子事件进行关闭
               throw new IllegalStateException("failed to create a child event loop", var19);
           } finally {
               if (var18) {
                   if (!success) {
                       int j;
                       for(j = 0; j < i; ++j) {
                           this.children[j].shutdownGracefully();
                       }

                       for(j = 0; j < i; ++j) {
                           EventExecutor e = this.children[j];

                           try {
                               while(!e.isTerminated()) {
                                   e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                               }
                           } catch (InterruptedException var20) {
                               Thread.currentThread().interrupt();
                               break;
                           }
                       }
                   }

               }
           }

           if (!success) {
               for(j = 0; j < i; ++j) {
                   this.children[j].shutdownGracefully();
               }

               for(j = 0; j < i; ++j) {
                   EventExecutor e = this.children[j];

                   try {
                       while(!e.isTerminated()) {
                           e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                       }
                   } catch (InterruptedException var22) {
                       Thread.currentThread().interrupt();
                       break;
                   }
               }
           }
       }
		// 事件选择工厂赋值 如果NioEventLoopGroup分组下的时间处理是2的倍数 
		// 使用位运算取得下次可以处理时间的某个EventExecutor
		// 否则使用取模的方式取得下次可以处理时间的某个EventExecutor
		// (isPowerOfTwo(executors.length) ? new PowerOfTwoEventExecutorChooser(executors) 
		// : new GenericEventExecutorChooser(executors));
       this.chooser = chooserFactory.newChooser(this.children);
       //
       FutureListener<Object> terminationListener = new FutureListener<Object>() {
           public void operationComplete(Future<Object> future) throws Exception {
               if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
               // 当NioEventLoopGroup分组下的所有EventExecutor 都完成了任务,
               // 则设置NioEventLoopGroup 的任务执行成功数据
                   MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);
               }

           }
       };
       EventExecutor[] var24 = this.children;
       j = var24.length;
	 // 为每一个 EventExecutor 添加 terminationListener 事件回调
	 //  当每个EventExecutor 任务完成之后,都进入改监听方法
       for(int var26 = 0; var26 < j; ++var26) {
           EventExecutor e = var24[var26];
           e.terminationFuture().addListener(terminationListener);
       }
		// 将NioEventLoopGroup 分组下的所有EventExecutor 进行复制并赋值
       Set<EventExecutor> childrenSet = new LinkedHashSet(this.children.length);
       Collections.addAll(childrenSet, this.children);
       this.readonlyChildren = Collections.unmodifiableSet(childrenSet);
   }
}

到这里可以看到MultithreadEventExecutorGroup 一些关键点的初始化:
关键点1:线程工厂类的初始化

if (executor == null) {
		// 初始化 线程工厂类
		// (对要创建的线程,优先级,是否守护线程,线程前准,线程分组 赋值)
    executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
}

关键点2:NioEventLoopGroup 中每个NioEventLoop 的创建:
声明NioEventLoopGroup 的NioEventLoop 数量:

 this.children = new EventExecutor[nThreads];

每个NioEventLoop 的初始化:

   this.children[i] = this.newChild((Executor)executor, args);

关键点3:下一个执行任务的EventExecutor获取的定义:

   this.chooser = chooserFactory.newChooser(this.children);

改类中一些属性赋值的具体方法实现:
(1)MultithreadEventExecutorGroup 类中 newDefaultThreadFactory():

protected ThreadFactory newDefaultThreadFactory() {
    return new DefaultThreadFactory(this.getClass());
}
// 设置改工厂创建线程类名称,非守护线程,线程的优先级为5
public DefaultThreadFactory(Class<?> poolType) {
    this((Class)poolType, false, 5);
}
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
   this(toPoolName(poolType), daemon, priority);
}
// 获取线程池的类名称
public static String toPoolName(Class<?> poolType) {
    ObjectUtil.checkNotNull(poolType, "poolType");
    String poolName = StringUtil.simpleClassName(poolType);
    switch (poolName.length()) {
        case 0:
            return "unknown";
        case 1:
            return poolName.toLowerCase(Locale.US);
        default:
            return Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1)) ? Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1) : poolName;
    }
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
   this(poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}
// threadGroup 线程进行分组(取的当前线程的线程组),方便追踪
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
	// 线程属性赋值
	// 线程id
   this.nextId = new AtomicInteger();
   ObjectUtil.checkNotNull(poolName, "poolName");
   if (priority >= 1 && priority <= 10) {
   		// 线程名字
       this.prefix = poolName + '-' + poolId.incrementAndGet() + '-';
       // 线程是否守护线程
       this.daemon = daemon;
       // 线程的优先级
       this.priority = priority;
       // 线程的分组
       this.threadGroup = threadGroup;
   } else {
       throw new IllegalArgumentException("priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
   }
}

(2) new ThreadPerTaskExecutor(this.newDefaultThreadFactory()):

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;
	//  threadFactory  赋值
    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = (ThreadFactory)ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    public void execute(Runnable command) {
        this.threadFactory.newThread(command).start();
    }
}

this.newChild((Executor)executor, args): 子事件的初始化:
executor :线程工厂类newDefaultThreadFactory
args 数组:
selectorProvider:Selector的提供者 此时为 WindowsSelectorProvider
selectStrategyFactory: 默认的选择器策略,此时为 DefaultSelectStrategyFactory 对象
RejectedExecutionHandlers.reject():拒绝的策略直接抛出了RejectedExecutionException

(3)这里进入到一个关键点 NioEventLoopGroup 类中的newChild 具体 NioEventLoop的初始化方法:

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory)args[3] : null;
    return new NioEventLoop(this, executor, (SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2], queueFactory);
}

进入NioEventLoop 类的构造方法:

 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
    private static final int CLEANUP_INTERVAL = 256;
    private static final boolean DISABLE_KEY_SET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
    private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
    private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
    private final IntSupplier selectNowSupplier = new IntSupplier() {
        public int get() throws Exception {
            return NioEventLoop.this.selectNow();
        }
    };
    private Selector selector;
    private Selector unwrappedSelector;
    private SelectedSelectionKeySet selectedKeys;
    private final SelectorProvider provider;
    private static final long AWAKE = -1L;
    private static final long NONE = Long.MAX_VALUE;
    private final AtomicLong nextWakeupNanos = new AtomicLong(-1L);
    private final SelectStrategy selectStrategy;
    private volatile int ioRatio = 50;
    private int cancelledKeys;
    private boolean needsToSelectAgain;

// 参数: executor 线程工厂类newDefaultThreadFactory 
// selectorProvider:Selector的提供者 此时为 WindowsSelectorProvider 
// selectStrategyFactory: 默认的选择器策略,此时为  DefaultSelectStrategyFactory 对象
// RejectedExecutionHandlers.reject():拒绝的策略直接抛出了RejectedExecutionException
// queueFactory null
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
	// 调用父类方法,完成任务队列的赋值
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler);
    // selectorProvider 赋值将 WindowsSelectorProvider 赋值
    this.provider = (SelectorProvider)ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    // 选择策略赋值
    this.selectStrategy = (SelectStrategy)ObjectUtil.checkNotNull(strategy, "selectStrategy");
    // netty 自己优化的Selector SelectorTuple
    SelectorTuple selectorTuple = this.openSelector();
    // 赋值 selector 
    this.selector = selectorTuple.selector;
    // unwrappedSelector 中既有selector又有存放被优化后的 selectionKeys 
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

到此可以看到NioEventLoop 的初始化完成了,不过在NioEventLoop 初始化中有些关键点需要注意:

关键点1 :newTaskQueue(queueFactory):队列创建,可以看到改队列的大小是integer 的最大值,并且队列的消费是多生产者一个消费者,此消费模型以为这每个NioEventLoop 实际上只有一个线程在进行任务的处理

// 创建多个线程充当生产者,只有一个线程充当消费者
private static Queue<Runnable> newTaskQueue(EventLoopTaskQueueFactory queueFactory) {
	// queueFactory 为null 
	// 进入 newTaskQueue0(DEFAULT_MAX_PENDING_TASKS) 创建队列
	// 队列长度为int 的最大值
    return queueFactory == null ? newTaskQueue0(DEFAULT_MAX_PENDING_TASKS) : queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
    return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue() : PlatformDependent.newMpscQueue(maxPendingTasks);
}
public static <T> Queue<T> newMpscQueue() {
     return PlatformDependent.Mpsc.newMpscQueue();
 }

super 方法调用 SingleThreadEventLoop :

protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
    private final Queue<Runnable> tailTasks;
// executor     线程工厂类newDefaultThreadFactory 
// addTaskWakesUp false
// taskQueue 多生产者一个消费者的队列
// tailTaskQueue多生产者一个消费者的队列
// rejectedExecutionHandler :拒绝的策略直接抛出了RejectedExecutionException
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, RejectedExecutionHandler rejectedExecutionHandler) {
	
    super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
    // 赋值任务队列
    this.tailTasks = (Queue)ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}

super 方法进入SingleThreadEventExecutor :

// executor     线程工厂类newDefaultThreadFactory 
// addTaskWakesUp false
// taskQueue 多生产者一个消费者的队列
// rejectedExecutionHandler :拒绝的策略直接抛出了RejectedExecutionException
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
        // 为AbstractEventExecutor 每个childEvent 赋值其父类NioEventLoopGroup 对象
        super(parent);
        this.threadLock = new CountDownLatch(1);
        this.shutdownHooks = new LinkedHashSet();
        this.state = 1;
        // 线程异步任务执行
        this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
        this.addTaskWakesUp = addTaskWakesUp;
        // 最大任务数
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        // executor 赋值
        this.executor = ThreadExecutorMap.apply(executor, this);
        // 队列赋值
        this.taskQueue = (Queue)ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        // 拒绝策略
        this.rejectedExecutionHandler = (RejectedExecutionHandler)ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

关键点2 : netty 中对于Selector 的改造 SelectorTuple openSelector():通过此方法可以看到 对于要遍历的 selectKey 底层是用数组进行了替换

private SelectorTuple openSelector() {
  final AbstractSelector unwrappedSelector;
    try {
    	// 通过 WindowsSelectorProvider 初始化:WindowsSelector
        unwrappedSelector = this.provider.openSelector();
    } catch (IOException var7) {
        throw new ChannelException("failed to open a new selector", var7);
    }

    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    } else {
    	// 获取SelectorImpl 的类
        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            public Object run() {
                try {
                    return Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
                } catch (Throwable var2) {
                    return var2;
                }
            }
        });
        if (maybeSelectorImplClass instanceof Class && ((Class)maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            final Class<?> selectorImplClass = (Class)maybeSelectorImplClass;
            final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
            // 构建数组类型的 selectedKeySet  替换掉 SelectorImpl 中原有的
            //  Set<SelectionKey> selectedKeys 和 Set<SelectionKey> publicSelectedKeys
            Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                public Object run() {
                    try {
                        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                        if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                            long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                            long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
                            if (selectedKeysFieldOffset != -1L && publicSelectedKeysFieldOffset != -1L) {
                                PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                                PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                                return null;
                            }
                        }

                        Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                        if (cause != null) {
                            return cause;
                        } else {
                            cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                            if (cause != null) {
                                return cause;
                            } else {
                                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                                return null;
                            }
                        }
                    } catch (NoSuchFieldException var7) {
                        return var7;
                    } catch (IllegalAccessException var8) {
                        return var8;
                    }
                }
            });
            if (maybeException instanceof Exception) {
                this.selectedKeys = null;
                Exception e = (Exception)maybeException;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
                return new SelectorTuple(unwrappedSelector);
            } else {
            	// 赋值成功后将优化后的 selectedKeySet 赋值给 selectedKeys 
                this.selectedKeys = selectedKeySet;
                logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
                // 返回优化后的selector 
                return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
            }
        } else {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable)maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }

            return new SelectorTuple(unwrappedSelector);
        }
    }
}

至此NioEventLoopGroup 对象初始化完毕,代码嵌套内容还是比较深的;

3 NioEventLoopGroup 初始化的总结:

  • NioEventLoopGroup 中在设置核心线程数之后,会创建对应核心线程数的事件处理器,如果没有设置“io.netty.eventLoopThreads” 参数 ,则核心线程数为当前电脑cpu的核心数量*2;
  • 每个事件处理都拥有一个自己的Selector 事件选择器,并且拥有创建线程执行任务的能力;
  • 每个事件处理器中都会有一个线程来执行任务,模式是多生产者,一个消费者模型,如果任务无法及时处理会被放入到队列长度为integer最大值的任务队列;
  • NioEventLoopGroup 初始化的bossGroup 后面会用来处理客户端的accept 时间,而wokerGroup 会用来处理客户端的读事件,以及服务端的写事件;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/628504.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用dataFEED OPC Suite将西门子PLC数据转发至阿里云RDS数据库

一 背景 工业现场级别的各种设备会产生大量的数据&#xff0c;这些数据包含生产过程的各种信息&#xff0c;在经过数据库等IT应用的处理后&#xff0c;可为企业提供全面的生产数据分析和决策支持。以往工厂的数据库通常部署在本地&#xff0c;然而得益于云计算的快速发展以及云…

k8s harbor镜像仓库搭建

1.前言 Harbor 是一个开源的云原生镜像仓库&#xff0c;用于存储和分发 Docker 镜像。它提供了一些安全性和管理方面的功能&#xff0c;使得用户可以更好地管理和共享 Docker 镜像 2.配置harbor搭建环境 harbor的搭建需要用到docker、docker-compose服务 docker搭建参考&am…

Vivado 下 IP核之双端口 RAM 读写

目录 Vivado 下 IP核之双端口 RAM 读写 1、RAM IP 核简介 2、实验任务 3、程序设计 3.1、RAM IP 核配置 3.2、顶层模块设计 &#xff08;1&#xff09;绘制波形图 4、编写代码 4.1、顶层模块 ip_2port_ram 4.2、RAM 写模块设计 4.3、ram_wr 模块代码 4.4、RAM 读模…

基于graalvm和java swing制作一个文件差异对比的原生应用,附源码

文章目录 1、DFDiff介绍2、软件架构3、安装教程3.1、编译为jar包运行3.2、编译为原生应用运行 4、运行效果图5、项目源码地址 1、DFDiff介绍 当前已实现的功能比较两个文件夹内的文件差异&#xff0c;已支持文件差异对比。 2、软件架构 软件架构说明 开发环境是在OpenJDK17&…

安装 Kafka

文章目录 1.选择操作系统2.配置 Java 环境3.安装 ZooKeeper4.安装 broker&#xff08;1&#xff09;安装 broker&#xff08;2&#xff09;验证是否安装正确 5.配置 broker&#xff08;1&#xff09;常规配置&#xff08;2&#xff09;主题的默认配置 6.配置 Kafka 集群&#x…

CAC2.0全新升级发布,为企业邮箱筑起安全壁垒!

5月31日&#xff0c;Coremail举办了【聚焦盗号&#xff0c;企业邮件安全的威胁分析与应对】直播交流会。直播会上Coremail邮件安全团队就邮箱盗号问题进行了深度分享。 面对如此肆虐的盗号现象和即将到来的攻击暴破高峰期&#xff0c;各行业应该如何应对防护邮箱安全呢&#xf…

什么是防火墙?它有什么作用?

作者&#xff1a;Insist-- 个人主页&#xff1a;insist--个人主页 作者会持续更新网络知识和python基础知识&#xff0c;期待你的关注 目录 一、什么是防火墙 二、防火墙的分类 1、软件防火墙 2、硬件防火墙 三、防火墙的作用 1、防止病毒 2、防止访问不安全内容 3、阻…

如何使用AI帮你制作PPT

一&#xff1a;前言 ChatGPT&#xff1a;智能AI助你畅聊天地 在现代人日益忙碌的生活中&#xff0c;难免需要一些轻松愉快的聊天来放松身心。而现在&#xff0c;有了 ChatGPT&#xff0c;轻松愉快的聊天变得更加智能、有趣且不受时间、地点限制&#xff01; 什么是 ChatGPT&…

嵌入式Linux系统中SPI 子系统基本实现

1、SPI 驱动源文件目录 Linux common spi driver kernel-4.14/drivers/spi/spi.c Linux 提供的通用接口封装层驱动 kernel-4.14/drivers/spi/spidev.c linux 提供的 SPI 通用设备驱动程序 kernel-4.14/include/linux/spi/spi.h linux 提供的包含 SPI 的主要数据结构和函数…

sourcetree的使用

目录 前言 一、Sourcetree简介 二、创建分支与合并分支 三、合并冲突问题 总结 前言 今天提交项目代码时,接触到非常好用方便的可视化Git管理提交软件Sourcetree,今天记录一下使用过程 一、Sourcetree简介 通过Git可以进行对项目的版本管理,但是如果直接使用Git的软件会比…

MinIO快速入门——在Linux系统上安装和启动

1、简介 MinIO 是一款基于Go语言发开的高性能、分布式的对象存储系统。客户端支持Java,Net,Python,Javacript, Golang语言。MinIO系统&#xff0c;非常适合于存储大容量非结构化的数据&#xff0c;例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等。 2、环境搭建&#…

【MySQL】一文带你了解数据库约束

文章目录 1. 约束类型2&#xff0e;PRIMARY KEY&#xff1a;主键约束3&#xff0e;FOREIGN KEY&#xff1a;外键约束4&#xff0e;NOT NULL&#xff1a;非空约束5&#xff0e;UNIQUE&#xff1a;唯一约束5&#xff0e;DEFAULT&#xff1a;默认值约束6&#xff0e;总结 1. 约束类…

用数据说话,R语言有哪七种可视化应用?

今天&#xff0c;随着数据量的不断增加&#xff0c;数据可视化成为将数字变成可用的信息的一个重要方式。R语言提供了一系列的已有函数和可调用的库&#xff0c;通过建立可视化的方式进行数据的呈现。在使用技术的方式实现可视化之前&#xff0c;我们可以先和AI科技评论一起看看…

SpringBoot 源码分析准备应用上下文(2)-prepareContext

一、入口 /*** Run the Spring application, creating and refreshing a new* {link ApplicationContext}.* param args the application arguments (usually passed from a Java main method)* return a running {link ApplicationContext}*/public ConfigurableApplicationC…

生成测试报告,在Unittest框架中就是简单

测试套件&#xff08;Test Suite&#xff09;是测试用例、测试套件或两者的集合&#xff0c;用于组装一组要运行的测试&#xff08;多个测试用例集合在一起&#xff09;。 &#xff08;1&#xff09;创建一个测试套件&#xff1a; import unittest suite unittest.TestSuite…

车载测试:详解ADAS传感器(相机)标定数据采集方法

1.基本原理 相机外参标定&#xff0c;通过拍摄多角度棋盘格标定相机外参。 2.外参标定板设计 标定板分为垂直标定板和水平标定板&#xff0c;由于地面的水平标定板不容易被检测到&#xff0c;本文采用垂直标定板进行相机标定。 在标定过程中标定板需要和车身坐标成正交状态…

中国人民大学与加拿大女王大学金融硕士——所有的为时已晚都是恰逢其时

你是否有过同样的感觉&#xff0c;工作之余想学点什么又觉得有点晚了&#xff0c;心里反复纠结&#xff0c;总是没个结果。记得在网上看到过一句话&#xff0c;你觉得为时已晚的时候&#xff0c;恰恰是最早的时候。与其在心里反复琢磨&#xff0c;不如去付诸行动。中国人民大学…

超详细,自动化测试-Allure测试报告动态生成用例/标题(实战撸码)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 pytest 结合 allu…

Android-源码分析-MTK平台BUG解决:客户电池NTC功能(移植高低温报警,关机报警功能)---第一天分析与解决

MTK平台BUG解决&#xff1a;客户电池NTC功能 一、概述二、步骤1&#xff1a;实现目的&#xff1f;2&#xff1a;准备工作&#xff1a;机制原理的学习&#xff08;1&#xff09;MTK充电温度保护机制&#xff08;2&#xff09;MTKthermal高温充电机制 3&#xff1a;定位查找与源码…

提高自动化测试效率 , WEB自动化框架的基础封装模块!

目录 前言 一、环境搭建 1. Python环境 2. Selenium安装 3. Chrome浏览器 二、基础封装模块介绍 1. 代码框架介绍 2. 使用示例 三、总结 前言 在软件测试中&#xff0c;WEB自动化测试已成为不可或缺的一部分。WEB自动化测试涉及到大量的代码编写&#xff0c;为了提高…