Netty 之 DefaultPromise 源码解析

news2024/11/17 2:43:49

  在解析Netty源码时,在解析NioEventLoop 创建过程中,有一段这样的代码。

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);
    
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
	
	
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

  今天就围绕着加粗代码进行分析 。
  首先来看children是什么?从前面的代码children = new EventExecutor[nThreads],可以看出children是EventExecutor数组。
在这里插入图片描述
  而EventExecutor和NioEventLoop是什么关系呢?请看下图 。
在这里插入图片描述
  接着分析chooser = chooserFactory.newChooser(children);这一行代码,chooserFactory的默认值为DefaultEventExecutorChooserFactory类,在newChooser中使用不同的策略来获取NioEventLoop。

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
    	// 判断数组的长度是否是2的幂次方
        if (isPowerOfTwo(executors.length)) {
        	// 如果是2的倍数,则使用PowerOfTwoEventExecutorChooser
        	// 选择器策略
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
        	// 如果不是2的幂次方,则使用GenericEventExecutorChooser选择器策略
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

  DefaultEventExecutorChooserFactory类中两种策略执行结果一样,都是采用轮询方式,如果数组长度是2的幂次方,则用求&的方式来计算要取数组下标的索引值,如果不是2的幂次方,则用求余的方式来计算数组下标的索引值,为什么要区分一下呢? 因为求 & 的方式比求余的方式效率更高,如果数组长度不是2的倍数,则不能使用求 & 的方式 。其实像HashMap,MpscChunkedArrayQueue源码中都用到了求&的方式,如果阅读过相关源码的小伙伴肯定比较熟悉了,但如果还有小伙伴不明白,我们可以来看一个例子,假如数组的长度为14。
在这里插入图片描述
  当idx.getAndIncrement()的值为16和18时,和17和19时,计算出索引位置为0,1,则存在数组索引冲突,因此不符合轮询的条件。而在DefaultEventExecutorChooserFactory中使用 PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser两种策略,主要是为了提高轮询效率 。
  这段代码先放一边。

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
    @Override
    public void operationComplete(Future<Object> future) throws Exception {
        if (terminatedChildren.incrementAndGet() == children.length) {
            terminationFuture.setSuccess(null);
        }
    }
};

  先看后面的

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

  这一段代码 。

在这里插入图片描述
  因为NioEventLoop继承SingleThreadEventExecutor类,而SingleThreadEventExecutor类中实现了terminationFuture()方法。

private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);

public Future<?> terminationFuture() {
    return terminationFuture;
}

  从上面方法中得知,调用e.terminationFuture().addListener(terminationListener);这一行代码,实际上调用的是DefaultPromise的addListener()方法,进入addListener()方法 。

public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener, "listener");
	// 使用同步的方式,避免并发
    synchronized (this) {
        addListener0(listener);
    }

    if (isDone()) {
        notifyListeners();
    }

    return this;
}

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listeners == null) {
        listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } else {
        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
    }
}

  光从上面两个方法,看不出什么东西,好像也看不清DefaultPromise的意图。 进入DefaultFutureListeners类中 。

final class DefaultFutureListeners {
    private GenericFutureListener<? extends Future<?>>[] listeners;
    private int size;
    private int progressiveSize; // the number of progressive listeners


    DefaultFutureListeners(
            GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
        listeners = new GenericFutureListener[2];
        listeners[0] = first;
        listeners[1] = second;
        size = 2;
        if (first instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
        if (second instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void add(GenericFutureListener<? extends Future<?>> l) {
        GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        final int size = this.size;
        if (size == listeners.length) {
            this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
        }
        listeners[size] = l;
        this.size = size + 1;

        if (l instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void remove(GenericFutureListener<? extends Future<?>> l) {
        final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        int size = this.size;
        for (int i = 0; i < size; i ++) {
            if (listeners[i] == l) {
                int listenersToMove = size - i - 1;
                if (listenersToMove > 0) {
                    System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
                }
                listeners[-- size] = null;
                this.size = size;

                if (l instanceof GenericProgressiveFutureListener) {
                    progressiveSize --;
                }
                return;
            }
        }
    }

    public GenericFutureListener<? extends Future<?>>[] listeners() {
        return listeners;
    }

    public int size() {
        return size;
    }

    public int progressiveSize() {
        return progressiveSize;
    }
}

  从DefaultFutureListeners代码中可以看出DefaultFutureListeners和GenericFutureListener的关系,在DefaultFutureListeners实例中有一个GenericFutureListener数组。从DefaultFutureListeners的构造方法中可以看出, 默认初始化GenericFutureListener数组长度为2,因此再来理解addListener0()方法的原理就很简单了。

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listeners == null) {
        listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } else {
        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
    }
}

  如果向DefaultPromise添加一个listener,则用DefaultPromise的listener属性存储即可,如果向DefaultPromise添加两个listener,则用DefaultFutureListeners对象存储,但DefaultFutureListeners中listeners数组初始化长度为2,刚好存储添加的两个listener,如果向DefaultPromise添加的listener超过两个,则需要扩容DefaultFutureListeners的listener数组来存储了,扩容方式如上加粗代码,listener数组扩容为原来两倍,Netty这样做的原因,一方面是提升性能,另一方面为了节省存储空间吧。

  既然listener已经存储好了,什么时候调用operationComplete()方法呢?接下来看DefaultPromise的setSuccess()和setFailure()方法。
在这里插入图片描述
  当然,我们挑选setSuccess()看即可。 setFailure()方法和setSuccess()方法原理一样。

private boolean setSuccess0(V result) {
    return setValue0(result == null ? SUCCESS : result);
}

private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        if (checkNotifyWaiters()) {
            notifyListeners();
        }
        return true;
    }
    return false;
}

  看源码多的小伙伴肯定一眼就知道这里又用到了AQS。

private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
        AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
private volatile Object result;

  设置result的值时,如果其值当前为为null或UNCANCELLABLE,是可以被设置成功的,默认情况下,result值为null。那什么情况下会为UNCANCELLABLE呢? 请看DefaultPromise中的setUncancellable()方法 。

public boolean setUncancellable() {
    if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
        return true;
    }
    Object result = this.result;
    return !isDone0(result) || !isCancelled0(result);
}

  将result的值设置为UNCANCELLABLE后有什么影响呢? 请看cancel()方法 。

public boolean cancel(boolean mayInterruptIfRunning) {
    if (RESULT_UPDATER.get(this) == null &&
            RESULT_UPDATER.compareAndSet(this, null, new CauseHolder(new CancellationException()))) {
        if (checkNotifyWaiters()) {
            notifyListeners();
        }
        return true;
    }
    return false;
}

  当result的值必须为空时,cancel()方法才能调用成功,接着调用notifyListeners()方法,最终调用才Listener的operationComplete()方法 。如果 result的值被设置为UNCANCELLABLE后,cancel()方法将不会调用成功。 言归正传,继续看setValue0()的checkNotifyWaiters()方法 。

private synchronized boolean checkNotifyWaiters() {
    if (waiters > 0) {
        notifyAll();
    }
    return listeners != null;
}

  如果waiters大于0,则调用notifyAll()方法。 notifyAll()方法是Object类中的方法,就是唤醒所有等待线程的意思,接下来看waiters值何时修改的呢? 请看下图。
在这里插入图片描述

  在DefaultPromise类中有两个方法,incWaiters()和decWaiters()方法,这两个方法对waiters值做了修改,何时调用waiters的值呢?能举个例子不? 当 ChannelFuture cf = bootstrap.bind(9000).sync(); 的sync()方法调用时。
在这里插入图片描述
  也就是DefaultPromise的sync()方法被调用时。 waiters的值会++,同时会触发当前线程进入wait()。

public Promise<V> sync() throws InterruptedException {
    await();
    rethrowIfFailed();
    return this;
}


public Promise<V> await() throws InterruptedException {
    if (isDone()) {
        return this;
    }

    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }

    checkDeadLock();

    synchronized (this) {
        while (!isDone()) {
            incWaiters();
            try {
                wait();
            } finally {
                decWaiters();
            }
        }
    }
    return this;
}

  因此在 setValue0() 方法的notifyListeners调用之前,先要唤醒正在等待的线程,接着调用notifyListeners()方法 。接下来进入notifyListeners()方法 。

private void notifyListeners() {
    EventExecutor executor = executor();
    if (executor.inEventLoop()) {
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        final int stackDepth = threadLocals.futureListenerStackDepth();
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
                notifyListenersNow();
            } finally {
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }

    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

  大家可能会想,上述这段代码什么意思?先来看executor()方法。

protected EventExecutor executor() {
    return executor;
}

  executor的值为GlobalEventExecutor。
在这里插入图片描述
  因此inEventLoop()方法实际上是调用GlobalEventExecutor的inEventLoop()方法 。

public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

  判断的依据就是thread是否是当前线程,那GlobalEventExecutor中的thread又在什么时候初始化的呢? 看DefaultPromise的safeExecute()方法 。

final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();

private final AtomicBoolean started = new AtomicBoolean();

private static void safeExecute(EventExecutor executor, Runnable task) {
    try {
        executor.execute(task);
    } catch (Throwable t) {
        rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
    }
}

  因为知道executor为GlobalEventExecutor,因此这里的execute方法实际为GlobalEventExecutor中的方法。

public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    addTask(task);
    if (!inEventLoop()) {
        startThread();
    }
}

private void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    taskQueue.add(task);
}


private void startThread() {
	// 保证同一时间,只有一个线程在执行消费任务操作
    if (started.compareAndSet(false, true)) {
        final Thread t = threadFactory.newThread(taskRunner);
        // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
        // classloader.
        // See:
        // - https://github.com/netty/netty/issues/7290
        // - https://bugs.openjdk.java.net/browse/JDK-7008595
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
            @Override
            public Void run() {
                t.setContextClassLoader(null);
                return null;
            }
        });
        // Set the thread before starting it as otherwise inEventLoop() may return false and so produce
        // an assert error.
        // See https://github.com/netty/netty/issues/4357
        thread = t;
        t.start();
    }
}

  大家发现没有,调用GlobalEventExecutor的execute方法,并没有直接执行任务的run()方法,而是将任务添加到taskQueue队列中,接着调用threadFactory的newThread()方法,创建一个新线程,新线程中执行TaskRunner任务,在TaskRunner的run()方法中,从队列中取出任务,并执行其run()方法 ,看 TaskRunner的源码 。

final class TaskRunner implements Runnable {
    @Override
    public void run() {
        for (;;) {
            Runnable task = takeTask();
            if (task != null) {
                try {
                	// 执行任务的run()方法
                    task.run();
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from the global event executor: ", t);
                }

                if (task != quietPeriodTask) {
                    continue;
                }
            }

            Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
            // Terminate if there is no task in the queue (except the noop task).
            if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
                // Mark the current thread as stopped.
                // The following CAS must always success and must be uncontended,
                // because only one thread should be running at the same time.
                boolean stopped = started.compareAndSet(true, false);
                assert stopped;

                // Check if there are pending entries added by execute() or schedule*() while we do CAS above.
                if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
                    // A) No new task was added and thus there's nothing to handle
                    //    -> safe to terminate because there's nothing left to do
                    // B) A new thread started and handled all the new tasks.
                    //    -> safe to terminate the new thread will take care the rest
                    break;
                }

                // There are pending tasks added again.
                if (!started.compareAndSet(false, true)) {
                    // startThread() started a new thread and set 'started' to true.
                    // -> terminate this thread so that the new thread reads from taskQueue exclusively.
                    break;
                }

                // New tasks were added, but this worker was faster to set 'started' to true.
                // i.e. a new worker thread was not started by startThread().
                // -> keep this thread alive to handle the newly added entries.
            }
        }
    }
}

  上面这段代码还是令人费解的,先来看takeTask()方法 。

public Runnable takeTask() {
    BlockingQueue<Runnable> taskQueue = this.taskQueue;
    for (;;) {
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null) {
            Runnable task = null;
            try {
                task = taskQueue.take();
            } catch (InterruptedException e) {
                // Ignore
            }
            return task;
        } else {
        	// 第一种情况,如果ScheduledFutureTask创建到代码执行到这一行为止,时间没有超过deadlineNanos,
        	// 则delayNanos = deadlineNanos - (currentTime - createTime)
        	// 第二种情况,如果ScheduledFutureTask是被移除了,并重新加入到scheduledTaskQueue中
        	// 则 delayNanos = deadlineNanos - (currentTime - 加入到scheduledTaskQueue中的时间)
            long delayNanos = scheduledTask.delayNanos();
            Runnable task;
            if (delayNanos > 0) {
                try {
                    task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
                } catch (InterruptedException e) {
                    // Waken up.
                    return null;
                }
            } else {
                task = taskQueue.poll();
            }

            if (task == null) {
                fetchFromScheduledTaskQueue();
                task = taskQueue.poll();
            }

            if (task != null) {
                return task;
            }
        }
    }
}


final ScheduledFutureTask<?> peekScheduledTask() {
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    if (scheduledTaskQueue == null) {
        return null;
    }
    return scheduledTaskQueue.peek();
}

  上面 long delayNanos = scheduledTask.delayNanos();这一行代码,先看delayNanos()的计算规则。

ScheduledFutureTask{
    public long delayNanos() {
        return Math.max(0, deadlineNanos() - nanoTime());
    }
    
    public long deadlineNanos() {
        return deadlineNanos;
    }

	// ScheduledFutureTask实例创建时间
    private static final long START_TIME = System.nanoTime();
	
	// 当前时间减去创建时间
    public static long nanoTime() {
        return System.nanoTime() - START_TIME;
    }
}

  那scheduledTask的值是何时初始化的呢?
在这里插入图片描述
在这里插入图片描述
  从上图中得知, deadlineNanos的值来源于ScheduledFutureTask的deadlineNanos()方法,且在GlobalEventExecutor中传入的值为1秒。 而deadlineNanos的值与ScheduledFutureTask START_TIME静态变量初始化,以及ScheduledFutureTask实例化时间有关。 他们之间的关系为。

在这里插入图片描述
  假如SCHEDULE_QUIET_PERIOD_INTERVAL 的值为1秒,如果ScheduledFutureTask的START_TIME变量初始化到ScheduledFutureTask实例化花了0.5秒 ,则deadlineNanos的值为1.5秒。
在这里插入图片描述
  其实 netty 这么做原因就是要保证 delayNanos 有充足的1秒时间,排除掉ScheduledFutureTask初始化的时间,可能是一种更加精准的考虑吧。 在addTask() 方法中将任务加入到taskQueue中。
在这里插入图片描述
  根据上图第一种情况当执行到long delayNanos = scheduledTask.delayNanos(); 这一行代码的时间小于deadlineNanos时,和第二种情况,当执行到long delayNanos = scheduledTask.delayNanos(); 大于deadlineNanos时。
在这里插入图片描述
  对应执行代码行如上图所示 。 Netty这么做的目的可能是。
在这里插入图片描述
  addTask()方法,将任务添加到taskQueue队列中,和takeTask()方法从队列中获取任务,两者之间是异步的。 可能会存在taskQueue.poll()方法比taskQueue.add()方法先执行,因此这里设置一个deadlineNanos参数,来等待任务吧。

  接下来看fetchFromScheduledTaskQueue()这个方法 。

public void fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    Runnable scheduledTask = pollScheduledTask(nanoTime);
    while (scheduledTask != null) {
        taskQueue.add(scheduledTask);
        scheduledTask = pollScheduledTask(nanoTime);
    }
}

protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop();
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
    if (scheduledTask == null) {
        return null;
    }
	// 如果超过了deadlineNanos时间还没有从taskQueue队列中获取任务 
	// 则将scheduledTask任务添加到taskQueue中
    if (scheduledTask.deadlineNanos() <= nanoTime) {
        scheduledTaskQueue.remove();
        return scheduledTask;
    }
    return null;
}

  如果taskQueue中已经没有元素,则将scheduledTaskQueue队列中的元素添加到taskQueue队列中。
在这里插入图片描述
  上述 pollScheduledTask()方法有点像优先级队列,因为peek()方法取队头元素,但元素并没有弹出队列,如果时间还未达到,是不能从优先级队列中取出元素的,因此在pollScheduledTask()方法中,加了一行scheduledTask.deadlineNanos() <= nanoTime作为条件,如果时间还未达到,返回的是空,只有时间达到了,才能从scheduledTaskQueue中取到元素。

  在fetchFromScheduledTaskQueue()方法中写了一个while()循环,只要scheduledTaskQueue中有元素满足条件,则都加到taskQueue队列中,请看下面例子。
在这里插入图片描述

  自己写了一个MyGlobalEventExecutor继承GlobalEventExecutor,当然在netty源码中GlobalEventExecutor是final类型的,为了方便测试,我修改了netty源码相关内容,重新编译了netty源码,因此在我的代码中是可以实现的。 在GlobalEventExecutor的构造函数中, 向scheduledTaskQueue中添加了一个quietPeriodTaskNew任务。在GlobalEventExecutor构造函数中也添加了一个quietPeriodTask任务。因此scheduledTaskQueue中已经有两个ScheduledFutureTask任务。
在这里插入图片描述

测试例子

public static void main(String[] args) throws Exception {
    GlobalEventExecutor INSTANCE = new MyGlobalEventExecutor();
    Promise<String> promise = new DefaultPromise<>(INSTANCE);
    promise.addListener(new GenericFutureListener<Future<? super String>>() {
        @Override
        public void operationComplete(Future<? super String> future) throws Exception {
            Object s = future.get();
            System.out.println(new Date() + "listner1---promise的future返回值:" + s);
        }
    });
    promise.setSuccess(Thread.currentThread().getName() + " promise set ");
}

打断点查看循环情况
在这里插入图片描述

  我觉得fetchFromScheduledTaskQueue() 方法的while()循环应该是考虑到这种情况吧,如果获取到正常任务,则调用其run()方法,最终调用的就是notifyListenersNow()方法。
在这里插入图片描述
  关于notifyListenersNow()方法的实现逻辑,后面来分析。先分析,如果取到的任务是ScheduledFutureTask,其run()方法的实现逻辑。

public void run() {
    assert executor().inEventLoop();
    try {
        if (periodNanos == 0) {
            if (setUncancellableInternal()) {
                V result = task.call();
                setSuccessInternal(result);
            }
        } else {
            // check if is done as it may was cancelled
            if (!isCancelled()) {
                task.call();
                if (!executor().isShutdown()) {
                    long p = periodNanos;
                    if (p > 0) {
                        deadlineNanos += p;
                    } else {
                        deadlineNanos = nanoTime() - p;
                    }
                    if (!isCancelled()) {
                        // scheduledTaskQueue can never be null as we lazy init it before submit the task!
                        Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
                                ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
                        assert scheduledTaskQueue != null;
                        scheduledTaskQueue.add(this);
                    }
                }
            }
        }
    } catch (Throwable cause) {
        setFailureInternal(cause);
    }
}

  正常情况下,executor()并没有被关闭也没有被取消,因此isCancelled() && executor().isShutdown() 都为false,则只和periodNanos相关了,如果periodNanos = 0 ,则ScheduledFutureTask将从scheduledTaskQueue中移除掉,如果 periodNanos 不等于0 , 则重新计算 deadlineNanos 的值,并将原ScheduledFutureTask从taskQueue中移动到scheduledTaskQueue中。 当然deadlineNanos的计算也分两种情况,
  第一种情况,当periodNanos 大于0 ,假设为5秒,ScheduledFutureTask 初始化开始时间为 18:00:00,初始化耗时为1秒,ScheduledFutureTask对象创建时间为18:00:01秒,delay 时间为1秒(也就是GlobalEventExecutor 的 SCHEDULE_QUIET_PERIOD_INTERVAL参数默认为1 秒),则deadlineNanos = 2 秒,如果periodNanos为5秒,则第一次调用ScheduledFutureTask的run方法后,deadlineNanos = 7 秒,如果在18:00:07秒之前调用GlobalEventExecutor的takeTask()方法 。 则会走下图中的1 的情况,如果在18:00:07秒之后调用takeTask()方法,则会走下图中2的情况。

takeTask方法图
在这里插入图片描述

  第二种情况,当periodNanos 小于0 ,假设为-5秒,ScheduledFutureTask 初始化开始时间为 18:00:00,初始化耗时为1秒,ScheduledFutureTask对象创建时间为18:00:01秒,delay 时间为1秒(也就是GlobalEventExecutor 的 SCHEDULE_QUIET_PERIOD_INTERVAL参数默认为1 秒),则deadlineNanos = 2 秒,如果执行到下图中的
在这里插入图片描述

  deadlineNanos = nanoTime() - p;这一行代码的时间为18:00:08秒,则deadlineNanos = 8 + 5 = 13,也就是说在18:00:13秒之前,会执行takeTask方法图的第一种情况,否则会执行takeTask方法图第二种情况。 这可能是代码上的含义,从字面意义上来说呢?periodNanos不为0的情况,如果当前线程从队列中取不到任务,当前线程则会退出循环,当其他线程向taskQueue中添加任务,则又会调用TaskRunner的run()方法,此时又会调用taskTask()方法从taskQueue中取元素,如果从队列中获取元素的时间小于deadlineNanos,则会调用调用taskTask()方法的这一行代码task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); 方法来获取队列中的任务,如果大于deadlineNanos,则会调用taskTask()方法的这一行代码task = taskQueue.poll();来获取队列中的任务 。

  第三种情况,如果periodNanos等于0时,会出现什么情况呢?
在这里插入图片描述
  为什么必须将父类中构造函数GlobalEventExecutor添加的scheduledTaskQueue().add(quietPeriodTask);这一行代码移除掉呢?如果不移除掉。
在这里插入图片描述
  当scheduledTaskQueue中只有我们手动添加的quietPeriodTaskNew时,而quietPeriodTaskNew的periodNanos为0。
在这里插入图片描述
  为什么呢? 因为在scheduledTask的run()方法中,走了下面这一行代码。
在这里插入图片描述
  并没有将ScheduledFutureTask添加到scheduledTaskQueue中,则peekScheduledTask()方法中获取到的scheduledTaskQueue队列中的任务为空,因此进入一直等待。 要测试效果也很容易。
在这里插入图片描述
  分析完safeExecute()方法后,再来分析notifyListeners()方法 。
在这里插入图片描述
  在notifyListeners()方法中,上面加红框代码,是什么意思呢? 依然还是来看一个例子。
在这里插入图片描述

  在GenericFutureListener的operationComplete()方法中,又加入新的Listener,此时新加入的Listener的operationComplete方法依然被调用。
在这里插入图片描述
  InternalThreadLocalMap在之前的博客 Netty源码性能分析 - ThreadLocal PK FastThreadLocal 分析过,也可以直接将其当成ThreadLocal来看,但有一点还是在疑问? 想了很久没有想明白 。

public class TestPromise2 {

    public static void main(String[] args) throws Exception {


        NioEventLoopGroup loopGroup = new NioEventLoopGroup();
        EventLoop next = loopGroup.next();
        Promise<String> promise = (Promise) next.terminationFuture();

        promise.addListener(new GenericFutureListener<Future<? super String>>() {
            @Override
            public void operationComplete(Future<? super String> future) throws Exception {
                Object s = future.get();
                System.out.println(new Date() + "listner1---promise的future返回值:" + s);

                ((DefaultPromise) future).addListener(new GenericFutureListener<Future<? super String>>() {
                    @Override
                    public void operationComplete(Future<? super String> future) throws Exception {
                        Object s = future.get();

                        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
                        final int stackDepth = threadLocals.futureListenerStackDepth();
                        System.out.println(new Date() + "listner1 inner ---promise的future返回值:" + s + ", stackDepth = " + stackDepth);


                        ((DefaultPromise) future).addListener(new GenericFutureListener<Future<? super String>>() {
                            @Override
                            public void operationComplete(Future<? super String> future) throws Exception {
                                final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
                                final int stackDepth = threadLocals.futureListenerStackDepth();

                                Object s = future.get();
                                System.out.println(new Date() + "listner1 inner  inner ---promise的future返回值:" + s + ", stackDepth ="+stackDepth);
                            }
                        });


                    }
                });
            }
        });


        promise.setSuccess("promise set ");//设置返回结果 并且通知所有Listeners执行回调方法operationComplete方法
    }
}


在这里插入图片描述

  从上例子中,我嵌套了两层,但是threadLocals.futureListenerStackDepth()的值始终是0,并没有随着嵌套的层数增加,而增加。为什么源码中stackDepth < MAX_LISTENER_STACK_DEPTH) 要做这个限制呢?
在这里插入图片描述
  为什么打印两次stackDepth 的值都是0,感兴趣的小伙伴可以去打断点试试 。 还是解释一下吧。
在这里插入图片描述
在这里插入图片描述
  因此在operationComplete()方法的内部调用addListener()并没有我要想像中的,继续调用notifyListenersNow()中的notifyListeners0()方法,而是发现notifyingListeners为true,则直接返回了。 相应的futureListenerStackDepth值也被设置为原来的值, 所以嵌套调用的逻辑就是

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
  我弄了很多的图片,主要想通过打断点的过程来分析notifyListenersNow()方法的执行过程,notifyListenersNow()这个方法看上去简单,但包含的情况还是很多的,因此有兴趣的小伙伴,可以自己写例子去分析为好 。 notifyListener0()方法的内部就是直接调用回调方法operationComplete()了。

private void notifyListeners0(DefaultFutureListeners listeners) {
    GenericFutureListener<?>[] a = listeners.listeners();
    int size = listeners.size();
    for (int i = 0; i < size; i ++) {
        notifyListener0(this, a[i]);
    }
}

private static void notifyListener0(Future future, GenericFutureListener l) {
    try {
        l.operationComplete(future);
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }
}
总结

  关于DefaultPromise 源码解析又告一段落了,在分析DefaultPromise 源码过程中,心情也是一波三折, 有些东西,感觉自己懂了,但是一测试,发现又不是那么回事,可能还是测试用例不全面,随着Netty源码的分析深入,我相信这些问题都会得到解决,这篇博客中也有一些疑问没有解决,可能在后面再遇到类似问题,再来补全博客了吧。 下一篇博客见。

源码地址:

https://github.com/quyixiao/test_netty.git

https://gitee.com/quyixiao/netty-netty-4.1.38.Final.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/184511.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java深拷贝和浅拷贝Map对象

目录1 将Map深拷贝到另一个Map对象当中2 浅拷贝Map1 将Map深拷贝到另一个Map对象当中 今天赋值的时候遇到的小坑 相关文章推荐&#xff1a; Java克隆方式避免频繁创建对象优化方案 https://blog.csdn.net/ZGL_cyy/article/details/126556907 1.需求说明 将一个MapA对象中所…

Fork之前创建了互斥锁,要警惕死锁问题

文章目录Fork之前创建了互斥锁&#xff0c;要警惕死锁问题使用GDB进行调试如何解决该问题&#xff1f;是否还有别的问题&#xff1f;结论参考文献Fork之前创建了互斥锁&#xff0c;要警惕死锁问题 下面的这段代码会导致子进程出现死锁问题&#xff0c;您看出来了吗&#xff1f…

【MFC】使用MFC框架(10)

MFC不仅仅是一个类库&#xff0c;而且是一个所谓的“设计框架”&#xff0c;注入了很多开发理念和设计思想。类库与框架的区别可以理解为“食材”与“火锅”套餐的区别——火锅套餐已经标明了开发者必须接受已定的一些规则&#xff0c;包括“Message Mapping消息映射机制”、“…

忽略语法细节,从整体上理解函数

从整体上看&#xff0c;C语言代码是由一个一个的函数构成的&#xff0c;除了定义和说明类的语句&#xff08;例如变量定义、宏定义、类型定义等&#xff09;可以放在函数外面&#xff0c;所有具有运算或逻辑处理能力的语句&#xff08;例如加减乘除、if else、for、函数调用等&…

配置中心-开源系统对比分析

一、为什么需要配置中心 1、配置实时生效 传统的静态配置方式要想修改某个配置只能修改之后重新发布应用&#xff0c;要实现动态性&#xff0c;可以选择使用数据库&#xff0c;通过定时轮询访问数据库来感知配置的变化。轮询频率低感知配置变化的延时就长&#xff0c;轮询频率…

运放电路中电容的作用-运算放大器

在运放电路中&#xff0c;大家可能会经常看到这么几个电容&#xff0c;分别是&#xff1a; 1、电源VCC到地 2、反馈输入输出引脚之间 3、正负两输入端之间的电容 就算不要这几个电容&#xff0c;电路好像也能工作&#xff0c;但电路设计一般都会加上&#xff0c;那么这几个电…

软件无线电之数字下变频(Matlab实例)

软件无线电之数字下变频 1 原理 在通信系统中&#xff0c;为了易于信号发射以及实现信道复用&#xff0c;传输的信号发射频率一般很高。 在接收机中&#xff0c;为了降低信号的载波频率或是直接去除载波频率得到基带信号&#xff0c;通常将接收信号与本地振荡器产生的本振信…

Java循环综合案例

文章目录Java循环综合案例案例一&#xff1a;逢 7 跳过案例二&#xff1a;数组元素求和案例三&#xff1a;判断两个数组是否相同案例四&#xff1a;查找元素在数组中的索引案例五&#xff1a;数组元素反转案例六&#xff1a;评委打分案例七&#xff1a;随机产生验证码Java循环综…

那些年我们拿下了 Zynq

小菜鸟的 Zynq 学习经验分享~ 资料来源&#xff1a;黑金 Zynq7035 开发板配套资料&#xff0c;完全适合于 Zynq 学习。 获取方式&#xff1a;【51爱电子】回复【Zynq7000】即可获取资料链接&#xff01;本资料仅供学习使用&#xff0c;切勿商用。 另外四个是关于 Altera FPGA…

跨域和cookie

本文以前端的视角来探讨浏览器的跨域和cookie问题。 一、跨域 跨域简介&#xff1a; 为什么会出现跨域&#xff1f; 出于浏览器的同源策略限制&#xff0c;浏览器会拒绝跨域请求。 什么情况下出现跨域&#xff1f; 不同源就会跨域。同源即&#xff1a;协议、域名、端口号…

图文详解:箭头函数与常规函数的this指向问题

函数中this的指向问题特别容易让人迷糊&#xff0c;这里用示例来指点迷津&#xff0c;走出迷茫。 常规函数下的this指向 1. 纯粹的函数调用 function test(name) { console.log(name) console.log(this) } test(zjcopy) ; test.call(zjcopy, cuclife-2) ; test.call(fal…

pytesseract 安装错误总结

项目场景&#xff1a; 使用eclipse调用pytesseract接口&#xff0c;进行OCR识别。 在anaconda的python3.6.4版本&#xff0c;安装配置pytesseract 问题描述 pip install pytesseract 报错 错误提醒&#xff1a;pytesseract requires Python >3.7 but the running Python…

【数据结构】顺序栈的原理及实现

【数据结构】顺序栈的原理及实现 1.什么是栈 栈它是一种先进后出的有序列表数据结构。栈是线性表里的元素插入和删除只能在该线性表的同一端进行的一种特殊线性表。该线性表的插入和删除都叫栈顶&#xff0c;也就是变化的一端。另一端是固定不变的成为栈底。根据下图可以看出…

《高性能MySQL》——架构与历史(笔记)

文章目录一、MySQL架构与历史1.1.1 连接管理与安全性1.1.2 优化与执行1.2 并发控制1.2.1 读写锁1.2.2 锁粒度&#xff08;锁模式&#xff09;表锁(table lock)行级锁(row lock)1.3 事务1.3.1 隔离级别READ UNCOMITTED (读未提交)READ COMMITTED (读提交)REPEATABLE READ (可重复…

初识C++(学习计划)

前言 基于对C语言的学习&#xff0c;我将进一步学习C的相关知识。 我们在使用C语言时创建的是.c文件&#xff0c;使用C使用的是.cpp文件&#xff0c;其中p——plus&#xff08;加&#xff0c;的意思&#xff09;&#xff0c;所以cpp就是c。 C是为了解决一些C语言不能解决的问题…

SpringBatch使用(一)

一、SpringBatch简介 1、Spring Batch是一个轻量级&#xff0c;全面的批处理框架&#xff0c;旨在开发对企业系统日常运营至关重要的强大批处理应用程序。Spring Batch构建了人们期望的Spring Framework特性&#xff08;生产力&#xff0c;基于POJO的开发方法和一般易用性&…

docker安装elasticsearch kibana 8.6.0(设置密码+汉化+ik分词器)

安装eskibana安装:拉取镜像并安装设置密码汉化配置ik分词器安装: 记得开放使用的端口,或者关闭防火墙 提示:需要提升虚拟机或者服务器的内存到8G以上 拉取镜像并安装 docker pull elasticsearch:8.6.0 docker pull kibana:8.6.0docker network create es-netdocker run -it…

Itext7在PDF指定位置添加电子公章

目录 1. 电子公章的制作 2. java工具keytool生成p12数字证书文件 3. pom依赖 4. 实体类 5. 工具类及测试示例 6. 效果 1. 电子公章的制作 做章网站&#xff1a;http://seal.biaozhiku.com/ 我们选择圆形印章 然后输入公司名&#xff0c;输入章名输入编码然后点击395生成&…

快速幂及矩阵快速幂分析及代码实现

文章目录前言一、认识快速幂二、快速幂思路及代码三、矩阵快速幂3.1、矩阵乘法代码实现3.2、矩阵快速幂代码实现参考资料前言 在学习Acwing c蓝桥杯辅导课第九讲复杂DP-AcWing 1303. 斐波那契前 n 项和时有使用到矩阵快速幂算法&#xff0c;这里来记录下知识点正好也将快速幂部…

车载以太网 - SomeIP测试专栏 - 详细解析 - 01

对于介绍SomeIP协议&#xff0c;我还是想从最基础的协议解析来&#xff0c;所以今天还是先将SomeIP协议详解给大家列举一下&#xff0c;也方便大家在工作中如果不记得哪些信息随时可以查看学习&#xff0c;也算是留给我自己的笔记吧&#xff0c;毕竟确实容易忘记。 SomeIP数据&…