Netty系列-1 NioEventLoopGroup和NioEventLoop介绍

news2024/11/16 3:39:05

背景

从本文开始开启一个新的专题Netty系列,用于收集Netty相关的文章,内容包含Netty的使用方式、运行原理等。

基于io.netty:netty-all:4.1.49.Final版本进行介绍

1.NioEventLoopGroup

介绍NioEventLoopGroup之前,有几个相关的组件需要提前介绍。

1.1 SelectorProvider

NIO中用于创建通道channel和选择器selector接口的类,本质是对操作系统API进行的封装;属于NIO知识范畴(可参考IO系列文章),不是本文讨论的对象。

1.2 SelectStrategy

选择策略,这里结合接口的定义和实现以及使用进行说明。
接口定义如下:

public interface SelectStrategy {
    int SELECT = -1;
    int CONTINUE = -2;
    int BUSY_WAIT = -3;

    int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}
// 只有一个get方法,无参地获取一个整型变量。
public interface IntSupplier {
    int get() throws Exception;
}

SelectStrategy中定义了三个宏定义常量, 其中有个常量SelectStrategy.SELECT值为-1;定义了一个calculateStrategy方法,接收两个参数,并计算出策略结果。

默认实现类如下:

final class DefaultSelectStrategy implements SelectStrategy {
    static final SelectStrategy INSTANCE = new DefaultSelectStrategy();

    private DefaultSelectStrategy() {}

    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
}

逻辑较为简单:如果hasTasks布尔参数为true, 理解为有任务待执行,则从selectSupplier获取值并返回;如果hasTasks布尔参数为false, 理解为无任务待执行,则直接返回SelectStrategy.SELECT。

使用场景:
为理解方便,再结合SelectStrategy再Netty中的使用场景进行介绍。
在Netty的死循环中,每次循环伊始调用selectStrategy的calculateStrategy方法,计算strategy值,根据strategy值确定进行select阻塞还是执行处理任务:

@Override
protected void run() {
	//...
	for (;;) {
		//...
		int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
		switch (strategy) {
			case SelectStrategy.SELECT:
				// 执行select阻塞
			default:
				//...
				processSelectedKeys();
				//...
				runAllTasks();
		}
		//...
	}
	//...
}

其中selectNowSupplier的实现如下:

private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};

int selectNow() throws IOException {
    //选择器的selectNow方法,无阻塞地返回已就绪的通道数;没有就绪的通道,否则返回0
    return selector.selectNow();
}

此时,调用IntSupplier的get()获取的是已就绪(如客户端发送了可读数据)的通道数。
hasTasks()的实现如下:

protected boolean hasTasks() {
    return super.hasTasks() || !tailTasks.isEmpty();
}

protected boolean hasTasks() {
    return !taskQueue.isEmpty();
}

判断任务队列是否有任务积存。
综上,Netty的死循环中,每次循环判断一下任务队列是否有任务积存,如果有,则执行processSelectedKeys和runAllTasks方法,否则执行select阻塞。

工厂类:
Netty为DefaultSelectStrategy提供了一个工厂类DefaultSelectStrategyFactory:

public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
    public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();

    private DefaultSelectStrategyFactory() { }

    @Override
    public SelectStrategy newSelectStrategy() {
        return DefaultSelectStrategy.INSTANCE;
    }
}

因此,可通过DefaultSelectStrategyFactory.INSTANCE获取单例DefaultSelectStrategy.INSTANCE对象。

1.3 RejectedExecutionHandler

拒绝处理器,这里结合接口的定义和实现以及使用进行说明。
接口定义:

public interface RejectedExecutionHandler {
    void rejected(Runnable task, SingleThreadEventExecutor executor);
}

RejectedExecutionHandler仅定义了一个方法,rejected接受两个参数,Runnable任务和SingleThreadEventExecutor线程池对象。
匿名实现类如下:

// RejectedExecutionHandlers.java中

// REJECT静态属性
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
    @Override
    public void rejected(Runnable task, SingleThreadEventExecutor executor) {
        throw new RejectedExecutionException();
    }
};

// 静态方法,获取REJECT静态属性
public static RejectedExecutionHandler reject() {
    return REJECT;
}

直接抛出RejectedExecutionException异常。
可通过RejectedExecutionHandlers.reject()获取该RejectedExecutionHandler单例对象。

使用场景:

protected void addTask(Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    if (!offerTask(task)) {
        reject(task);
    }
}

protected final void reject(Runnable task) {
    // 抛出RejectedExecutionException异常
    rejectedExecutionHandler.rejected(task, this);
}

向任务队列(1.2章节中提到的taskQueue和tailTasks队列)中添加任务失败时,调用rejectedExecutionHandler的rejected方法抛出RejectedExecutionException异常。

1.4 EventExecutorChooser

线程选择器,这里结合接口的定义和实现以及使用进行说明。
接口定义:

interface EventExecutorChooser {
    EventExecutor next();
}

从接口定义可以看出,线程选择器的next()将会从线程池中返回一个线程。
两个实现类:

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser作为EventExecutorChooser实现类,内部都维持了一个 EventExecutor[]线程数组executors对象和AtomicInteger原子计数器idx.
next()方法将根据计数器idx计算出一个下标,根据下标从executors数组中选择一个线程返回。二者的区别是计算下表的方式不同:GenericEventExecutorChooser通过取模获取索引下标;而PowerOfTwoEventExecutorChooser通过位运算(按位与&)来快速计算索引,这种方式在EventLoop数量是2的幂次方时,能够显著提高性能。
工厂方法:

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        // 数组长度是否为2的幂次方
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
    
    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }
    
    // ...
}

使用场景:
NioEventLoopGroup可以理解为一个线程池,内部维持了一个NioEventLoop[]线程数组。当NioEventLoopGroup接受任务时,调用next()方法从NioEventLoop[]数组中获取一个NioEventLoop线程对象,并将任务委托给NioEventLoop对象执行,如下所示:

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

public ChannelFuture register(ChannelPromise promise) {
    return next().register(promise);
}

public ChannelFuture register(Channel channel, ChannelPromise promise) {
    return next().register(channel, promise);
}

public Future<?> submit(Runnable task) {
    return next().submit(task);
}

public <T> Future<T> submit(Runnable task, T result) {
    return next().submit(task, result);
}

public <T> Future<T> submit(Callable<T> task) {
    return next().submit(task);
}

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    return next().schedule(command, delay, unit);
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
    return next().schedule(callable, delay, unit);
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
    return next().scheduleAtFixedRate(command, initialDelay, period, unit);
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
    return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

public void execute(Runnable command) {
    next().execute(command);
}

1.5 ThreadPerTaskExecutor

ThreadPerTaskExecutor定义如下:

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

其中,Executor是JUC中的线程池接口:

public interface Executor {
    void execute(Runnable command);
}

当有任务提交给ThreadPerTaskExecutor线程池时,会通过threadFactory线程工厂创建一个线程,并将该任务提交给该线程执行。

使用场景:
构造ThreadPerTaskExecutor对象时,需要传入一个线程工厂用于确定是否守护线程,名称、线程属组、优先级等,这里newDefaultThreadFactory构造线程工厂的内容不重要,重点在与包装ThreadPerTaskExecutor对象,并最终赋值给NioEventLoop的this.executor属性。这部分将在NioEventLoop章节中介绍。

1.6 NioEventLoopGroup介绍

本章节从构造函数以及属性的角度介绍NioEventLoopGroup:
可以通过参数指定NioEventLoopGroup使用的线程数量,不指定时使用CPU属性*2作为线程数。

// 使用1个线程
EventLoopGroup bossGroup = new NioEventLoopGroup(1);

// cpu*2
EventLoopGroup workerGroup = new NioEventLoopGroup();

无参和带参的构造函数如下:

public NioEventLoopGroup() {
	this(0);
}

public NioEventLoopGroup(int nThreads) {
	this(nThreads, (Executor) null);
}

进入MultithreadEventLoopGroup构造函数中:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

而DEFAULT_EVENT_LOOP_THREADS在类加载阶段已确定:

private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
        "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}

如果设置了"io.netty.eventLoopThreads"环境变量,则使用这个环境变量指定的值,否则使用CPU数*2.
构造函数通过super逐层调用父类的构造函数进入MultithreadEventExecutorGroup中,删除异常分支后的主线逻辑如下:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
    // 步骤1.构造ThreadPerTaskExecutor对象
	executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    // 步骤2.创建children数组
	children = new EventExecutor[nThreads];
	for (int i = 0; i < nThreads; i ++) {        
		children[i] = newChild(executor, args);           
	}

    // 步骤3.创建线程选择器
	chooser = chooserFactory.newChooser(children);
    
    // 添加terminationListener监听器...
    
    // 步骤4.再维护一份只读的children
	Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
	Collections.addAll(childrenSet, children);
	readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

步骤1: 创建ThreadPerTaskExecutor线程池对象,将作为参数构造NioEventLoop对象。
步骤2:根据线程数创建children数组,调用newChild依次创建线程,得到的children称为子线程组;
步骤3:将子线程组传递给线程选择器构造函数,创建线程选择器;
步骤4:在NioEventLoopGroup内部维护一份只读的children子线程组;
此后,NioEventLoopGroup拥有了一个NioEventLoop数组对象,以及一个chooser线程选择器,选择器的next()方法会依次从NioEventLoop数组对象中返回一个NioEventLoop对象,用于执行提交给NioEventLoopGroup的任务。

继续看一下newChild方法:

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

这里传递给NioEventLoop构造参数的对象,如executor(ThreadPerTaskExecutor),SelectorProvider,SelectStrategy,RejectedExecutionHandler就是本章节介绍的组件。
另外,从NioEventLoopGroup的继承体系看,继承了ExecutorService具备了线程池的能力,继承了ScheduledExecutorService具备了定时任务的能力。
可以通过案例测试一下:

public static void main(String[] args) {
    EventLoopGroup group = new NioEventLoopGroup(4);
    for (int i = 0;i<10;i++) {
        group.submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("[test]"+Thread.currentThread().getName()+" executed.");
        });
    }
}

运行结果如下所示:

[test]nioEventLoopGroup-2-3 executed.
[test]nioEventLoopGroup-2-4 executed.
[test]nioEventLoopGroup-2-1 executed.
[test]nioEventLoopGroup-2-2 executed.
[test]nioEventLoopGroup-2-3 executed.
[test]nioEventLoopGroup-2-4 executed.
[test]nioEventLoopGroup-2-2 executed.
[test]nioEventLoopGroup-2-1 executed.
[test]nioEventLoopGroup-2-1 executed.
[test]nioEventLoopGroup-2-2 executed.

可以看出,group内部维护了4个线程,依次将任务提交给这四个线程处理。

2.NioEventLoop

如果将NioEventLoopGroup理解为线程池,NioEventLoop就是实际干活的线程, 且一旦启动后就不断地循环干活,如下所示:
在这里插入图片描述
NioEventLoop执行选择器的select方法阻塞后陷入阻塞,直到有任务或者IO事件才会唤醒NioEventLoop,依次处理IO事件、线程任务队列的任务,然后再次执行选择器的select方法阻塞后陷入阻塞,循环往复。
先从整体上对NioEventLoop的功能了解后,以下结合Netty源码对NioEventLoop进行详细介绍。

2.1 构造NioEventLoop

NioEventLoop构造函数:
继续NioEventLoopGroup中通过newChild方法构造NioEventLoop对象,进入NioEventLoop的构造函数:

NioEventLoop(
    NioEventLoopGroup parent, 
    Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, 
    RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory queueFactory) {
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
          rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    final SelectorTuple selectorTuple = openSelector();
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

参数parent为NioEventLoopGroup, executor/selectorProvider/strategy/rejectedExecutionHandler在NioEventLoopGroup中已介绍过,queueFactory此时为空,将使用Netty默认的队列工厂生产队列,队列用于存放提交给NioEventLoop的任务。
NioEventLoop属性:

随着父类构造器的不断调用,NioEventLoop将拥有以下重要属性:

// 所属的NioEventLoopGroup对象
private final EventExecutorGroup parent;

// 用于创建NIO通道和选择器的SelectorProvider对象
private final SelectorProvider provider;

// 多路复用选择器
private Selector selector;

// 线程池
private final Executor executor;

// 两个任务队列
private final Queue<Runnable> taskQueue;
private final Queue<Runnable> tailTasks;

// 拒绝处理器
private final RejectedExecutionHandler rejectedExecutionHandler;

// 选择策略
private final SelectStrategy selectStrategy;

parent属性记录所属的NioEventLoopGroup对象;
provider用于后续创建NIO的通道和选择器;
selector多路复用选择器,Netty底层是依赖于NIO实现的,需要依赖selector;
taskQueue和tailTasks任务队列,用于存放待执行的任务(提交给NioEventLoop的任务);
rejectedExecutionHandler拒绝处理器,当向队列添加任务失败时,调用拒绝处理器抛出异常;
selectStrategy选择处理器,根据是否有任务确定是执行select阻塞还是执行任务。
executor属性需要详细介绍一下:

this.executor = ThreadExecutorMap.apply(executor, this);

创建给ThreadExecutorMap.apply方法2个参数:ThreadPerTaskExecutor类型的线程池executor对象,对于提交的每个任务都会创建一个线程,并将任务委托给该线程处理;另外一个参数是当前NioEventLoop对象。

public static Executor apply(final Executor executor,final EventExecutor eventExecutor) {
    return new Executor() {
        @Override
        public void execute(final Runnable command) {
            executor.execute(apply(command, eventExecutor));
        }
    };
}

该方法将返回一个线程池对象,当向该线程池对象提交任务时(调用execute并传递Runnable时),将调用ThreadPerTaskExecutor执行apply(command, eventExecutor)返回的任务,apply对该任务进行了增强:

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
	return new Runnable() {
		@Override
		public void run() {
			setCurrentEventExecutor(eventExecutor);
			try {
				command.run();
			} finally {
				setCurrentEventExecutor(null);
			}
		}
	};
}

这是常见的ThreadLocal写法:

private static final FastThreadLocal<EventExecutor> mappings = new      FastThreadLocal<EventExecutor>();

private static void setCurrentEventExecutor(EventExecutor executor) {
    mappings.set(executor);
}

此时,在任务的前后进行了增强,将NioEventLoop保存到mappings中,并在任务执行完后删除。

总之,this.executor是一个Executor类,当该类的execute(Runnable command)方法被调用时,ThreadPerTaskExecutor会创建一个线程来执行这个任务,且这个任务前后将NioEventLoop对象添加到ThreadLocal中。
ThreadPerTaskExecutor创建的这个线程将会保存在NioEventLoop的thread属性中,每个NioEventLoop都会绑定一个线程,具体逻辑在下一节进行介绍。

2.2 启动NioEventLoop

NioEventLoopGroup group = new NioEventLoopGroup(4);

在NioEventLoopGroup对象以及NioEventLoop[]数组(以及数组元素)创建完成后,所有的NioEventLoop处于未启动状态,向其提交任务时会启动NioEventLoop循环。
可通过如下方式, 启动一个NioEventLoop:

Runnable task = () -> System.out.println(Thread.currentThread().getName());
group.submit(task);

// 或者
NioEventLoop next = (NioEventLoop)group.next();
next.submit(task);

分析:向NioEventLoopGroup提交任务时,NioEventLoopGroup会通过next()方法获取一个子NioEventLoop并将任务提交给改子NioEventLoop,因此上述两种方式完全一致。

状态变量和状态值:

在NioEventLoop(以及其父类)内部定义了两个状态变量,如下所示:

// NioEventLoop绑定的线程
private volatile Thread thread;

//NioEventLoop状态, 初始状态为未启动
private volatile int state = ST_NOT_STARTED;

//状态值定义如下:
// 未开始
private static final int ST_NOT_STARTED = 1;
// 已开始
private static final int ST_STARTED = 2;
// 正在关闭
private static final int ST_SHUTTING_DOWN = 3;
// 已停止
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;

其中thread是NioEventLoop绑定的线程,NioEventLoop的所有工作都由该线程来执行,包括选择器的select、作为线程执行task和定时task;每个NioEventLoop在启动后都会绑定一个thread,且整个生命周期不会发生变化。NioEventLoop可以理解为一个线程的原因就在于此属性,后面使用NioEventLoop线程表示该thread。

启动NioEventLoop

可以向NioEventLoop中提交两种任务,懒加载任务和正常任务。懒加载任务表示不着急执行,可以等NioEventLoop可以执行任务的时候再执行(从select阻塞被唤醒后),正常任务提交后,会强制唤醒select阻塞,并执行任务。

懒加载任务不是重点内容,且懒加载任务与正常任务的区别仅在于是否强制唤醒select, 以下为突出主线逻辑,省略这一部分的介绍。

向NioEventLoop提交任务后,进入如下流程:

public void execute(Runnable task) {
    execute(task, true);
}

private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    // 将任务提交到`Queue<Runnable> taskQueue`队列中,等待执行
    addTask(task);
    if (!inEventLoop) {
        startThread();
        // NioEventLoop是否已停止,停止则拒绝任务,并抛出异常
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {}
            if (reject) {
                reject();
            }
        }
    }
    
    // addTaskWakesUp由构造函数传入-固定为false
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

这段代码的和核心逻辑在于 addTask(task)和startThread(),前者将任务保存至任务队列,后者启动线程。

boolean inEventLoop = inEventLoop() 判断当前线程是否是NioEventLoop线程,只有NioEventLoop线程自己向NioEventLoop提交任务时,才返回true, 其他线程均返回false。

if (!inEventLoop) {
    startThread();
    //...
}

inEventLoop为false时才会执行startThread(),因为inEventLoop为true表示当前任务由NioEventLoop线程提交,即NioEventLoop线程已启动,因此不需要再次调用startThread方法(后续inEventLoop判断逻辑也是这个原因,不再介绍)。

wakeup(inEventLoop)方法如下所示:

protected void wakeup(boolean inEventLoop) {
    // nextWakeupNanos稍后介绍
    if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
        selector.wakeup();
    }
}

调用选择器的wakeup()方法强制唤醒阻塞在selector上的NioEventLoop线程。
进入startThread方法:

private void startThread() {
	if (state == ST_NOT_STARTED) {
		if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
			boolean success = false;
			try {
				doStartThread();
				success = true;
			} finally {
				if (!success) {
					STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
				}
			}
		}
	}
}

通过状态变量和原子性的操作保证了doStartThread()方法只会执行一次,doStartThread方法的核心逻辑如下:

private void doStartThread() {
	executor.execute(new Runnable() {
		@Override
		public void run() {
			thread = Thread.currentThread();
			// ...
			SingleThreadEventExecutor.this.run();
			// ...
		}
	});
}

前面已介绍过,executor.execute执行一个Runnable任务的时,直接创建一个新的线程,并将Runnable任务提交给这个新创建的线程。通过thread = Thread.currentThread()将新创建的线程赋值给thread属性,用于NioEventLoop与thread绑定;这个线程继续执行SingleThreadEventExecutor.this.run()从而启动NioEventLoop。

run方法:
该run方法的实现位于NioEventLoop类中,省去框架代码和简化异常逻辑后如下所示:

protected void run() {
	int selectCnt = 0;
	for (;;) {
		try {
			int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
			switch (strategy) {
				case SelectStrategy.SELECT:
					long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
					if (curDeadlineNanos == -1L) {
						curDeadlineNanos = NONE;
					}
					nextWakeupNanos.set(curDeadlineNanos);
					try {
						if (!hasTasks()) {
							strategy = select(curDeadlineNanos);
						}
					} finally {
						nextWakeupNanos.lazySet(AWAKE);
					}
					// fall through
				default:
					;
			}
		} catch (IOException e) {
			rebuildSelector0();
			selectCnt = 0;
			handleLoopException(e);
			continue;
		}

		selectCnt++;
		cancelledKeys = 0;
		needsToSelectAgain = false;
		final int ioRatio = this.ioRatio;
		boolean ranTasks;
		if (ioRatio == 100) {
			try {
				if (strategy > 0) {
					processSelectedKeys();
				}
			} finally {
				ranTasks = runAllTasks();
			}
		} else if (strategy > 0) {
			final long ioStartTime = System.nanoTime();
			try {
				processSelectedKeys();
			} finally {
				final long ioTime = System.nanoTime() - ioStartTime;
				ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
			}
		} else {
			ranTasks = runAllTasks(0);
		}

		if (ranTasks || strategy > 0) {
			if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
				logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
						selectCnt - 1, selector);
			}
			selectCnt = 0;
		} else if (unexpectedSelectorWakeup(selectCnt)) {
			selectCnt = 0;
		}
	}
}

先从整体上看,这段代码的结构如下:

protected void run() {
	int selectCnt = 0;
	for (;;) {
		int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
		switch (strategy) {
			case SelectStrategy.SELECT:
				// select.select()阻塞
			default:
				;
		}

		//...
		
		// 执行就绪的SelectedKeys
		processSelectedKeys();
		
		//...
		
		// 处理任务队列的任务
		ranTasks = runAllTasks();
		
		//...
	}
}

通过hasTasks()判断NioEventLoop的任务队列中是否有任务:如果没有,strategy返回-1,执行select.select()陷入阻塞(或者返回已就绪的IO事件);如果任务队列中有任务,则执行strategy返回selector.selectNow()的值(已就绪的IO事件)且不阻塞。
processSelectedKeys()会遍历已就绪的IO事件,对应SelectionKey(包含通道、选择器、就绪事件信息),依次处理。
runAllTasks()依次从任务队列取出任务并执行。
说明hasTasks()有任务时,执行selector.selectNow()不阻塞,从而保证了一定会执行runAllTasks()方法; 无任务时,执行selector.select()或者selector.select(timeout)可能陷入阻塞。

processSelectedKeys()牵连内容较多,后续介绍消息处理流程时再进行介绍。

整体上理解run方法逻辑后,再看一下3处细节。
[1] select逻辑

// 获取下一次定时任务执行的时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
    // 没有定时任务创建
    curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {
    if (!hasTasks()) {
        // 根据是否有定时任务,执行select或者select(timeout)
        strategy = select(curDeadlineNanos);
    }
} finally {
    nextWakeupNanos.lazySet(AWAKE);
}

因为NioEventLoop继承了ScheduledExecutorService, 自然具备定时线程池的能力,因此存在定时任务队列。
执行select前,判断定时任务队列是否有任务:如果没有,则执行selecor.select()陷入持续阻塞状态;如果有,获取下一次定时任务的执行时间,执行selecor.select(timeout), 在定时任务执行时从阻塞中醒来, 保证定时任务按时执行。

[2] 异常逻辑

protected void run() {
	int selectCnt = 0;
	for (;;) {
		try {
			//...
		} catch (IOException e) {
			rebuildSelector0();
			selectCnt = 0;
			handleLoopException(e);
			continue;
		}
		//...
	}
}

当有IO异常发送时,需要重塑选择器,否则这个NioEventLoop对象将处于异常状态:https://github.com/netty/netty/issues/8566.
重塑过程包括:重新创建selector选择器对象,将注册到就选择器对象的通道全部取消,并重新注册到新的选择器上。
[3] ioRatio比率
ioRatio比率用于控制处理IO事件与执行任务队列任务占用CPU的比率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2076508.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

idea导入maven项目(别人的项目)爆红

作为一个经常学习交流的人&#xff0c;或者工作需要&#xff0c;我们都或多或少会把别人写好的代码拷贝过来学习或编辑&#xff0c;大多数时候都是把整个项目拿过来;但是往往把代码拿到之后放在自己电脑用 idea 打开的时候就会出现 pom.xml 文件红线报错&#xff0c;然后倒入的…

大模型企业应用落地系列》基于大模型的对话式推荐系统》技术架构设计全攻略

注&#xff1a;此文章内容均节选自充电了么创始人&#xff0c;CEO兼CTO陈敬雷老师的新书《自然语言处理原理与实战》&#xff08;人工智能科学与技术丛书&#xff09;【陈敬雷编著】【清华大学出版社】 文章目录 大模型企业应用落地系列全貌基于大模型的对话式推荐系统》技术架…

如何使用ssm实现投稿系统+vue

TOC ssm231论文投稿系统vue 系统概述 1.1 研究背景 如今互联网高速发展&#xff0c;网络遍布全球&#xff0c;通过互联网发布的消息能快而方便的传播到世界每个角落&#xff0c;并且互联网上能传播的信息也很广&#xff0c;比如文字、图片、声音、视频等。从而&#xff0c;…

软件测试 | 概念(1)

目录 前言 需求的概念 开发模型 软件的生命周期 常见开发模型 瀑布模型 螺旋模型 增量模型&#xff0c;迭代模型 敏捷模型 Scrum模型 测试模型 V模型 W模型&#xff08;双V模型&#xff09; 前言 测试&#xff1a;验证软件的特性是否满足用户的需求。 用户的需求…

vue3前端界面布置到服务器,使用户能用网址访问到界面

1.下载Nginx&#xff1a; nginx: download 2.下载好的Nginx解压缩&#xff0c; 在解压缩的文件夹下找到conf > nginx.conf&#xff0c;修改nginx.conf中的server&#xff0c;配置服务器的ip地址和端口号 3.执行npm run build命令&#xff0c;vue生成的dist下的文件全部放置在…

基于微信小程序的行李寄存管理系统的设计与实现(论文+源码)_kaic

基于微信小程序的行李寄存管理系统的设计与实现(论文源码)_kaic 摘 要 人们外出旅行的时候&#xff0c;经常会需要到行李寄存的服务。行李寄存处在全国各地都很常见。现存的行李寄存方式很传统&#xff0c;适合小规模的行李寄存&#xff0c;当行李数量较多时&#xff0c;就…

【领域驱动设计 打通DDD最小闭环】三 模型的建立-领域建模

本篇BLOG为DDD流程的第二步&#xff0c;在模型的建立阶段&#xff0c;领域专家与技术人员通过领域建模来完成更为细致的模型建立讨论 领域建模的目的 领域建模主要有两个目的&#xff1a; 将知识可视化&#xff0c;准确、深刻地反映领域知识&#xff0c;并且在业务和技术人…

神经网络——非线性激活

1 非线性激活 1.1 几种常见的非线性激活&#xff1a; ReLU (Rectified Linear Unit)线性整流函数 Sigmoid 1.2代码实战&#xff1a; 1.2.1 ReLU import torch from torch import nn from torch.nn import ReLUinputtorch.tensor([[1,-0.5],[-1,3]])inputtorch.reshape(…

HT97226 160mW免输出耦合电容的立体声耳机放大器

特点&#xff1a; 输出无需隔直流电容 卓越的低音效果 无咔嗒/噼噗声&#xff0c;50uV (typical) Vos 低THDN:最低0.002% 低噪声,VN: 8.5uV 支持单端输入和全差分输入 2.5V至6V较宽的电源工作范围 输出功率:80mW(fIN1kHz,VDD3.6V,RL32Ω, THDN1%) 160mW(PVDD5V,fIN1kHz,RL32Ω…

Java中的抽象类 abstract

抽象方法&#xff1a; 将共性的行为&#xff08;方法&#xff09;抽取到父类之后。由于每一个子类执行的内容不一样&#xff0c;所以&#xff0c;在父类中不能确定具体的方法体。该方法就可以定义为抽象方法。 抽象类 如果一个类中存在抽象方法&#xff0c;那么该类就必须声…

【软件测试】软件测试-----概念篇

软件测试相关概念 一.需求的相关概念1.1 用户需求1.2 软件需求 二. 开发模型2.1 模型的基本概念.2.2 软件的生命周期2.2.1 理解软件生命周期每个阶段的具体任务 2.3 常见的开发模型.2.3.1 瀑布模型(适用场景&#xff1a;需求固定的小项目).2.3.2 螺旋模型(适用场景&#xff1a;…

ollama+llama3.1 405B 简介

ollamallama3.1 简介 Llama 3.1是一款来自Meta的最新型号&#xff0c;提供8B、70 B和405 B模型。 llama3.1:latestllama3.1:8bllama3.1:70bllama3.1:405bllama3.1:8b-instruct-fp16llama3.1:8b-instruct-q2_Kllama3.1:8b-instruct-q3_K_Sllama3.1:8b-instruct-q3_K_Mllama3.1…

python如何调用另一个文件中的函数

在同一个文件夹下 调用函数&#xff1a; A.py文件&#xff1a; def add(x,y):print(和为&#xff1a;%d%(xy)) B.py文件&#xff1a; import A A.add(1,2) 或 from A import add add(1,2) 在不同文件夹下 A.py文件的文件路径&#xff1a;E:\PythonProject\winycg B.py文件&a…

构建并升级openssh至OpenSSH_9.8p1

组件说明OpenSSH_9.8p1最新版本&#xff08;2024年8月&#xff09;OpenSSL 1.1.1pCentOS7中默认是OpenSSL 1.0.2k-fips 26 Jan 2017版本&#xff0c;OpenSSH_9.8p1不支持CentOS7主要是因为有大量CentOS老版本需要升级RPM&#xff0c;需要适配&#xff0c;故选择此版本。AnolisO…

输入一个正的奇数n(1≤n≤9),打印一个高度为n的、由“*”组成的沙漏图案。当n=5时,输出如下沙漏图案:

输入一个正的奇数n&#xff08;1≤n≤9&#xff09;&#xff0c;打印一个高度为n的、由“*”组成的沙漏图案。当n5时&#xff0c;输出如下沙漏图案&#xff1a; int main(){int i,j,n,m;scanf("%d",&n);m n / 2;for(im1;i<1;i--){ //m1是中间数for(jm1-i;j&g…

音频筑基:为啥一个压缩率概念,中文搜索结果都是错的?

音频筑基&#xff1a;为啥一个压缩率概念&#xff0c;中文搜索结果都是错的&#xff1f; 缘起概念分析小结 缘起 最近看一些数据压缩类的文章&#xff0c;对不同场合下表达的压缩率概念分歧&#xff0c;产生了疑问。有的说&#xff0c;压缩率越小越好&#xff0c;有的又说&…

信刻光盘摆渡机——完全物理隔离,安全合规

信刻光盘摆渡机是一款跨网安全数据摆渡设备&#xff0c;用于不同等级网络之间数据跨网安全传输的需求&#xff0c;采用智能光盘机械手臂&#xff0c;模拟人工取放光盘&#xff0c;在保持物理隔离的条件下&#xff0c;安全合规实现网间信息系统数据库及文件同步、网间信息数据交…

下载B站视频作为PPT素材

下载B站视频作为PPT素材 1. 下载原理2. 网页分析3. 请求页面&#xff0c;找到数据4. 数据解析5. 音频、视频下载6. 合并音频与视频7. 完整代码 其实使用爬虫也不是第一次了&#xff0c;之前从网站爬过图片&#xff0c;下载过大型文件&#xff0c;如今从下载视频开始才想到要写一…

搭建自己的GPT

搭建自己的GPT 文章说明核心代码效果展示源码下载 文章说明 目前GPT的使用比较主流&#xff0c;现有开源大模型&#xff0c;可以拉取到本地进行部署&#xff0c;搭建属于自己的GPT对话工具&#xff1b;主要用于熟悉大模型的本地搭建&#xff1b;本文采用开源的Ollama进行服务提…

MyBatis中的#{}和${}区别、ResultMap使用、MyBatis常用注解方式、MyBatis动态SQL

#{}和${}区别&#xff1a; #{}&#xff1a;是占位符&#xff0c;采用预编译的方式sql中传值&#xff0c;防止sql注入&#xff0c;如果我们往sql中列值传递一般使用 #{}。 ${}&#xff1a;采用字符串拼接的方式直接拼接到sql语句中&#xff0c;一般不用于sql列值传递&#xf…