前言
Dubbo3 提供了一个挺有意思的 Executor,用来将提交到线程池里的任务按顺序串行执行。
需求背景:你有一个线程池,但是你不想修改它,现在你的需求是要把提交上去的任务按顺序串行执行。
在这样一个需求背景下,SerializingExecutor 诞生了。
SerializingExecutor 在 Dubbo3 的应用场景是:针对 HTTP2 上的 Stream 接收到的 Frame 要按顺序处理。
SerializingExecutor
SerializingExecutor 类图很简单,首先它实现了 Executor 接口,意味着它可以执行提交的任务。当然了,它本身不创建线程,会依赖实际的 Executor 执行任务。
它还实现了 Runnable 接口,意味着它也是一个可执行的任务,可以被提交到 Executor 里执行。
SerializingExecutor 按顺序串行执行任务的逻辑很简单,核心是:提交的任务先入队等待,而后再按顺序串行化的调度任务执行。
public final class SerializingExecutor implements Executor, Runnable {
// 运行状态 CAS防止并发
private final AtomicBoolean atomicBoolean = new AtomicBoolean();
// 实际跑任务的线程池
private final Executor executor;
// 任务队列
private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>();
}
SerializingExecutor 本身不创建线程,不具备异步执行任务的能力,它依赖一个实际干活的 Executor。
public SerializingExecutor(Executor executor) {
this.executor = executor;
}
它重写了execute()
方法,避免任务被直接执行,而是先入队等待,再自己去调度执行。
@Override
public void execute(Runnable r) {
// 先入队
runQueue.add(r);
// 调度执行
schedule(r);
}
schedule()
调度任务执行,通过CAS防止并发。它把自己提交到 executor 里去执行了,所以我们重点看run()
。
private void schedule(Runnable removable) {
if (atomicBoolean.compareAndSet(false, true)) {
boolean success = false;
try {
executor.execute(this);
success = true;
} finally {
if (!success) {
if (removable != null) {
runQueue.remove(removable);
}
atomicBoolean.set(false);
}
}
}
}
run()
也很简单,就是循环从队列里取出任务,然后挨个执行,队列本身保证了任务的先进先出,所以任务是按顺序串行执行的。
@Override
public void run() {
Runnable r;
try {
// 循环出队 先进先出 按顺序执行
while ((r = runQueue.poll()) != null) {
InternalThreadLocalMap internalThreadLocalMap = InternalThreadLocalMap.getAndRemove();
try {
r.run();
} catch (RuntimeException e) {
LOGGER.error(COMMON_ERROR_RUN_THREAD_TASK, "", "", "Exception while executing runnable " + r, e);
} finally {
InternalThreadLocalMap.set(internalThreadLocalMap);
}
}
} finally {
atomicBoolean.set(false);
}
if (!runQueue.isEmpty()) {
schedule(null);
}
}