Vert.x 源码解析(4.x)(一)——Context源码解析

news2024/11/17 3:27:50

目录

在这里插入图片描述

1.简介

Vert.x 中,多线程环境下的资源管理和状态维护是一个复杂的问题。为了解决这个问题,Vert.x 引入了 Context 这个核心概念。Context 负责在多线程环境下协调异步操作,提供线程安全的资源访问,并确保异步操作的正确执行顺序。本文将对 Vert.x 的 Context 进行源码解析,探讨它在异步编程中的作用、设计原理以及关键的实现细节。

在这里插入图片描述

2 源代码解析

2.1 Handler

此方法就是一个接口,通过实现该接口,做为回调使用

@FunctionalInterface
public interface Handler<E> {

  /**
   * Something has happened, so handle it.
   *
   * @param event  the event to handle
   */
  void handle(E event);
}

2.2 Context

Context接口,在此方法内部定义了基本的事件处理,比如立即执行任务,阻塞线程执行任务等。

public interface Context {
  /**
   * 当前线程是否在work线程
   */
  static boolean isOnWorkerThread() {
    Thread t = Thread.currentThread();
    return t instanceof VertxThread && ((VertxThread) t).isWorker();
  }
  /**
   * 当前线程是否在eventloop线程
   */
  static boolean isOnEventLoopThread() {
    Thread t = Thread.currentThread();
    return t instanceof VertxThread && !((VertxThread) t).isWorker();
  }
  /**
   * 当前线程是否是vertx线程
   */
  static boolean isOnVertxThread() {
    return Thread.currentThread() instanceof VertxThread;
  }
  /**
   * 当前线程立马运行handler任务,最后调用的也是dispatch
   */
  void runOnContext(Handler<Void> action);
  /**
   * work线程上运行,运行耗时任务,默认如果超出十秒则会写入日志。最后调用的也是dispatch
   * @param blockingCodeHandler 执行逻辑
   * @param ordered 如果为true则进入队列,这样子会串行执行,如果为false则立马执行这样子是并发执行
   * @param <T>
   * @return
   */
  <T> Future<@Nullable T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered);
   /**
   * 是否是eventloopContext
   */   
  boolean isEventLoopContext(); 
  /**
   * 是否是workContext
   */    
  boolean isWorkerContext();
  //get put  是context内部的一个ConcurrentHashMap,用于存储值,其实还有localdatade的get,put方法
  <T> T get(Object key);
  void put(Object key, Object value);
  //部署id 
  String deploymentID();
    
}

2.3 ContextInternal

ContextInternal是Context接口的一个扩展,它增加了一些内部使用的方法和功能。它通常不直接暴露给应用程序开发者,比如内部获取当前线程的Context。

dispatch中beginDispatch和endDispatch是一个很重要的方法,如果是Vertx的线程他是让当前线程所绑定的Context先切换当下的Context,接着在end里面重新切换回去。如果不是Vertx的线程就用ThreadLocal进行切换当前Context,在end中切换回去

public interface ContextInternal extends Context {
    
   //获取当前线程Context,所以原则上每个线程只有一个Context
  static ContextInternal current() {
    Thread thread = Thread.currentThread();
    //如果是Vertx的线程,则直接获取,因为每个Vertx线程创建的时候会绑定Context
    if (thread instanceof VertxThread) {
      return ((VertxThread) thread).context();
    } else {
      //如果不是Vert的线程则从ThreadLocal里进行获取
      VertxImpl.ContextDispatch current = VertxImpl.nonVertxContextDispatch.get();
      if (current != null) {
        return current.context;
      }
    }
    return null;
  }        
  /**
   * 里面包含线程池,包括对线程池进行监测性能的PoolMetrics
   * @return the context worker pool
   */
  WorkerPool workerPool();
  @Override
  /**
   * 通过执行器立马执行任务,参数就是Handler
   * @param action  the action to run
   */
  @Override
  default void runOnContext(Handler<Void> action) {
    executor().execute(() -> dispatch(action));
  }
  default void dispatch(Handler<Void> handler) {
    dispatch(null, handler);
  }
  //你看dispatch其实就是执行传入进来的Handler类的handle方法
  //这个方法很重要的方法就是讲线程绑定的context做切换
  default <E> void dispatch(E event, Handler<E> handler) {
    //beginDispatch,endDispatch主要是调用Vertx的方法执行,
    //context做切换以及比如一些执行时间统计、类加载器绑定等
    ContextInternal prev = beginDispatch();
    try {
      //重点执行handle
      handler.handle(event);
    } catch (Throwable t) {
      reportException(t);
    } finally {
      endDispatch(prev);
    }
  }
    /**
   * 切换当前的上下文Context
   */  
  default ContextInternal beginDispatch() {
    VertxImpl vertx = (VertxImpl) owner();
    return vertx.beginDispatch(this);
  }

  /**
   * 重新切换原先的上下文Context
   */
  default void endDispatch(ContextInternal previous) {
    VertxImpl vertx = (VertxImpl) owner();
    vertx.endDispatch(previous);
  }  
  
}

vertx.beginDispatch

该方法是Vertx的实现类VertxImpl里的。

具体就是先获取原先该线程或者ThreadLocal有无绑定的Context,有的话把这个context保留下来赋值给prev,并且把传入进来的context与当前线程和ThreadLocal绑定

//存储Context的类
static final ThreadLocal<ContextDispatch> nonVertxContextDispatch = new ThreadLocal<>(); 
//内部包含context以及classLoader
static class ContextDispatch {
    ContextInternal context;
    ClassLoader topLevelTCCL;
}

ContextInternal beginDispatch(ContextInternal context) {
    //获取当前线程
    Thread thread = Thread.currentThread();
    ContextInternal prev;
    //判断是否是VertxThread
    if (thread instanceof VertxThread) {
      //如果是则直接根据VertxThread里获取Context
      VertxThread vertxThread = (VertxThread) thread;
      prev = vertxThread.context;
      //执行时间统计
      if (!ContextBase.DISABLE_TIMINGS) {
        vertxThread.executeStart();
      }
      //将线程context改成传入进来的context
      vertxThread.context = context;
      if (!disableTCCL) {
        if (prev == null) {
          vertxThread.topLevelTCCL = Thread.currentThread().getContextClassLoader();
        }
        if (context != null) {
          thread.setContextClassLoader(context.classLoader());
        }
      }
    } else {
      //如果不是则执行该方法
      prev = beginDispatch2(thread, context);
    }
    //最后返回原先的Context(这里保留原先的context主要是后面之行结束了,线程的context要切换回去)
    return prev;
  }
-----------------------beginDispatch2-------------------方法
  private ContextInternal beginDispatch2(Thread thread, ContextInternal context) {
    //从ThreadLocal里进行获取ContextDispatch,内部就是Context以及classload
    ContextDispatch current = nonVertxContextDispatch.get();
    ContextInternal prev;
    //如果当前的不为空,则将prev设置成当前的context
    if (current != null) {
      prev = current.context;
    } else {
      //如果为空,则重新new一个设置会null并且在thradlocal里存储
      current = new ContextDispatch();
      nonVertxContextDispatch.set(current);
      prev = null;
    }
    //将传入的context赋值给当前线程context
    current.context = context;
    if (!disableTCCL) {
      if (prev == null) {
        current.topLevelTCCL = Thread.currentThread().getContextClassLoader();
      }
      thread.setContextClassLoader(context.classLoader());
    }
    //返回原先(当前)的context
    return prev;
  }

vertx.endDispatch

这里就是将startDispatch返回的原始绑定的Context传入进来进行切换。

void endDispatch(ContextInternal prev) {
  //获取当前线程,prev是刚刚startDispathc返回的值传进来的
  Thread thread = Thread.currentThread();
  if (thread instanceof VertxThread) {
    VertxThread vertxThread = (VertxThread) thread;
    //将当前线程的context切换回原来的
    vertxThread.context = prev;
    if (!disableTCCL) {
      ClassLoader tccl;
      if (prev == null) {
        tccl = vertxThread.topLevelTCCL;
        vertxThread.topLevelTCCL = null;
      } else {
        tccl = prev.classLoader();
      }
      Thread.currentThread().setContextClassLoader(tccl);
    }
    //这边就是结束时间
    if (!ContextBase.DISABLE_TIMINGS) {
      vertxThread.executeEnd();
    }
  } else {
    endDispatch2(prev);
  }
}

private void endDispatch2(ContextInternal prev) {
  ClassLoader tccl;
  //这边也是同理将原先的prev绑定重新给到threadLocal,如果为空则直接remove
  ContextDispatch current = nonVertxContextDispatch.get();
  if (prev != null) {
    current.context = prev;
    tccl = prev.classLoader();
  } else {
    nonVertxContextDispatch.remove();
    tccl = current.topLevelTCCL;
  }
  if (!disableTCCL) {
    Thread.currentThread().setContextClassLoader(tccl);
  }
}

2.4 ContextBase

2.4.1 ContextBase代码

是ContextInternal的实现类,它包含具体功能的实现,比如执行耗时任务,

public abstract class ContextBase implements ContextInternal {
  //数据存储
  private ConcurrentMap<Object, Object> data;
  //本地数据存储
  private ConcurrentMap<Object, Object> localData;
  //内部任务队列
  final TaskQueue internalOrderedTasks;
  //内部任务线程池
  final WorkerPool internalWorkerPool;
  //工作线程池,外部调用
  final WorkerPool workerPool;
  //任务队列
  final TaskQueue orderedTasks; 
  //eventloop线程
  private final EventLoop eventLoop;
 ........
     
  /**
   *
   * @param vertx
   * @param eventLoop  它从eventGroup里进行取出来的,用于处理事件
   * @param internalWorkerPool 可能会造成阻塞的内部任务执行器
   * @param workerPool  工作现场执行器,用于普通的异步任务
   * @param deployment  用于管理当前Verticle的部署和卸载
   * @param closeFuture 关闭的future
   * @param tccl  当前线程的classLoader
   */
  protected ContextBase(VertxInternal vertx,
                        EventLoop eventLoop,
                        WorkerPool internalWorkerPool,
                        WorkerPool workerPool,
                        Deployment deployment,
                        CloseFuture closeFuture,
                        ClassLoader tccl) {
    this.deployment = deployment;
    this.config = deployment != null ? deployment.config() : new JsonObject();
    this.eventLoop = eventLoop;
    this.tccl = tccl;
    this.owner = vertx;
    this.workerPool = workerPool;
    this.closeFuture = closeFuture;
    this.internalWorkerPool = internalWorkerPool;
    //有序任务队列,内部都是用的LinkedList,目的肯定是有序
    this.orderedTasks = new TaskQueue();
    //用于阻塞的任务队列
    this.internalOrderedTasks = new TaskQueue();
  }     
  //-------------------executeBlocking系列--------------------------------------------
  @Override
  public <T> Future<T> executeBlockingInternal(Handler<Promise<T>> action) {
    return executeBlocking(this, action, internalWorkerPool, internalOrderedTasks);
  }
  @Override
  public <T> Future<T> executeBlockingInternal(Handler<Promise<T>> action, boolean ordered) {
    return executeBlocking(this, action, internalWorkerPool, ordered ? internalOrderedTasks : null);
  }

  @Override
  public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered) {
    return executeBlocking(this, blockingCodeHandler, workerPool, ordered ? orderedTasks : null);
  }
  @Override
  public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, TaskQueue queue) {
    return executeBlocking(this, blockingCodeHandler, workerPool, queue);
  }
    
 /**
   * 上面的方法最后还是调用该方法
   * @param context contex上下文就是本身
   * @param blockingCodeHandler 就是传入进来的Handler(实际需要执行的任务)
   * @param workerPool 内部包含线程池以及性能检测指标类
   * @param queue 队列,根据是否有队列来判断是顺序执行还是并发执行
   * @param <T>
   * @return
   */
  static <T> Future<T> executeBlocking(ContextInternal context, Handler<Promise<T>> blockingCodeHandler,
      WorkerPool workerPool, TaskQueue queue) {
    //是个接口,根据实现它的begin、rejected、end内写入监测代码,后文给你展示下
    PoolMetrics metrics = workerPool.metrics();
    Object queueMetric = metrics != null ? metrics.submitted() : null;
    //获取Promise,就是我们前面ContextInternal的方法
    Promise<T> promise = context.promise();
    Future<T> fut = promise.future();
    try {
      //创建Runnable
      Runnable command = () -> {
        Object execMetric = null;
        //这边就是当他不为空时会调用监测类的begin方法
        if (metrics != null) {
          execMetric = metrics.begin(queueMetric);
        }
        //这个方法就是ContextInternal内实现的方法,就是调用handle.handle,这里重新创建了一个Handler,并且把传入进来的Handler写在它的实现方法里
        context.dispatch(promise, f -> {
          try {
            //传递了promise,promise是用来告知结果的
            blockingCodeHandler.handle(promise);
          } catch (Throwable e) {
            promise.tryFail(e);
          }
        });
        //这里也是监测类的end方法
        if (metrics != null) {
          metrics.end(execMetric, fut.succeeded());
        }
      };
      //这里获取workerPool的线程池
      Executor exec = workerPool.executor();
      //如果队列没有则直接线程池执行,如果有则提交给队列执行
      if (queue != null) {
        queue.execute(command, exec);
      } else {
        exec.execute(command);
      }
    } catch (RejectedExecutionException e) {
      // 任务异常监测调用
      if (metrics != null) {
        metrics.rejected(queueMetric);
      }
      throw e;
    }
    //这里返回future
    return fut;
  }
    //-------------------execute,runOnContext,emit--------------------------------------------
    //exceute以及runOnContext,emit实际执行方法都交由子类进行执行
  @Override
  public void execute(Runnable task) {
    execute(this, task);
  }
  
  protected abstract <T> void execute(ContextInternal ctx, Runnable task);

  @Override
  public final <T> void execute(T argument, Handler<T> task) {
    execute(this, argument, task);
  }

  protected abstract <T> void execute(ContextInternal ctx, T argument, Handler<T> task);

  @Override
  public <T> void emit(T argument, Handler<T> task) {
    emit(this, argument, task);
  }

  protected abstract <T> void emit(ContextInternal ctx, T argument, Handler<T> task);
    
}

2.4.2 TaskQueue

内部定义了一个LinkedList来存储任务,接着按照LinkedList添加顺序来顺序执行任务,因为代码量比较少,我全部添加进来并且加了注释。

public class TaskQueue {

  static final Logger log = LoggerFactory.getLogger(TaskQueue.class);

  private static class Task {

    private final Runnable runnable;
    private final Executor exec;

    public Task(Runnable runnable, Executor exec) {
      this.runnable = runnable;
      this.exec = exec;
    }
  }

  // @protectedby tasks
  private final LinkedList<Task> tasks = new LinkedList<>();

  // @protectedby tasks
  private Executor current;

  private final Runnable runner;

  //在开始就进行初始化了runner执行的就是run方法
  public TaskQueue() {
    runner = this::run;
  }
  /**
   * Run a task.
   *
   * @param task the task to run.
   */
  public void execute(Runnable task, Executor executor) {
    //同步,防止多线程添加,这样子能保证顺序
    synchronized (tasks) {
      //将任务添加进去
      tasks.add(new Task(task, executor));
      //判断当前线程池是否为空,为空则赋值,并且通过executor执行runner,实际执行的就是下面那个run方法
      if (current == null) {
        current = executor;
        try {
          executor.execute(runner);
        } catch (RejectedExecutionException e) {
          current = null;
          throw e;
        }
      }
    }
  }
  //实际执行的就是这一段,把task里的任务获取进行执行,没有任务则退出,不然就死循环执行
  private void run() {
    for (; ; ) {
      final Task task;
      //加锁,保证任务队列的操作是线程安全的
      synchronized (tasks) {
        task = tasks.poll();
        //没有任务直接退出死循环
        if (task == null) {
          current = null;
          return;
        }
        //如果task的线程与当前线程不是同一个线程
        if (task.exec != current) {
          //不是则将任务重新添加进tasks
          tasks.addFirst(task);
          //并且重新执行run方法
          task.exec.execute(runner);
          //设置当前current位当前的线程
          current = task.exec;
          return;
        }
      }
      try {
        //是当前线程直接执行
        task.runnable.run();
      } catch (Throwable t) {
        log.error("Caught unexpected Throwable", t);
      }
    }
  };


}

2.4.3 PoolMetrics

可以根据实现PoolMetrics在其begin、rejected、end内写入监测代码

public interface PoolMetrics<T> extends Metrics {

  /**
   * 任务提交
   */
  default T submitted() {
    return null;
  }

  /**
   * 任务开始
   * The submitted task start to use the resource.
   *
   */
  default T begin(T t) {
    return null;
  }

  /**
   * 记录任务被拒绝执行
   */
  default void rejected(T t) {
  }

  /**
   * 任务结束
   */
  default void end(T t, boolean succeeded) {
  }

}

2.5 WorkerContext

public class WorkerContext extends ContextBase {
  WorkerContext(VertxInternal vertx,
                WorkerPool internalBlockingPool,
                WorkerPool workerPool,
                Deployment deployment,
                CloseFuture closeFuture,
                ClassLoader tccl) {
    super(vertx, vertx.getEventLoopGroup().next(), internalBlockingPool, workerPool, deployment, closeFuture, tccl);
  }
 
  //----------------------runOnContext实际实现-----------------------------  
  @Override
  protected void runOnContext(ContextInternal ctx, Handler<Void> action) {
    try {
      run(ctx, null, action);
    } catch (RejectedExecutionException ignore) {
      // Pool is already shut down
    }
  }
  private <T> void run(ContextInternal ctx, T value, Handler<T> task) {
    Objects.requireNonNull(task, "Task handler must not be null");
     //调用executor(),以及newRunaable,内部就是ContextInternal的ctx.dispatch(value, task)方法
    executor().execute(() -> ctx.dispatch(value, task));
  }
  /**
   * 看代码实际上也是加入到orderedTasks队列进行执行
   * @return
   */    
  @Override
  public Executor executor() {
    if (executor == null) {
      executor = command -> {
        PoolMetrics metrics = workerPool.metrics();
        Object queueMetric = metrics != null ? metrics.submitted() : null;
        //加入到队列
        orderedTasks.execute(() -> {
          Object execMetric = null;
          if (metrics != null) {
            execMetric = metrics.begin(queueMetric);
          }
          try {
            command.run();
          } finally {
            if (metrics != null) {
              metrics.end(execMetric, true);
            }
          }
        }, workerPool.executor());
      };
    }
    return executor;
  }    
 //---------------------------execute和emit实际实现----------------------------
 //execute和emit实现的都是调用execute(orderedTasks, argument, task)方法,无非就是一个通过外部Handler,一个内部又new Handler了一个把传入进来的  //Handler给了ContextInternal的ctx.dispatch(value, task)方法执行
  @Override
  protected <T> void execute(ContextInternal ctx, T argument, Handler<T> task) {
    execute(orderedTasks, argument, task);
  }

  @Override
  protected <T> void emit(ContextInternal ctx, T argument, Handler<T> task) {
    execute(orderedTasks, argument, arg -> {
      ctx.dispatch(arg, task);
    });
  }
    
  /**
   * 执行任务
   * @param queue 队列
   * @param argument 执行返回值
   * @param task task handler
   * @param <T>
   */
  private <T> void execute(TaskQueue queue, T argument, Handler<T> task) {
     //判断当前调用线程是否work线程,是的话则直接执行,不是的话进入队列执行
    if (Context.isOnWorkerThread()) {
      task.handle(argument);
    } else {
      PoolMetrics metrics = workerPool.metrics();
      Object queueMetric = metrics != null ? metrics.submitted() : null;
      //进入队列执行
      queue.execute(() -> {
        Object execMetric = null;
        if (metrics != null) {
          execMetric = metrics.begin(queueMetric);
        }
        try {
          task.handle(argument);
        } finally {
          if (metrics != null) {
            metrics.end(execMetric, true);
          }
        }
      }, workerPool.executor());
    }
  }    
}

2.5.2 总结

**execute(): **

1.如果当前线程是work线程则直接执行。

2.如果不是则进入orderedTasks队列顺序执行,并且在执行前面添加了PoolMetrics的监测方法。

3.但是都带有返回值

runOnContext:

进入orderedTasks队列顺序执行,用的workerpool,并且在执行前面添加了PoolMetrics的监测方法

emit:

1.如果当前线程是work线程则直接执行。

2.如果不是则进入orderedTasks队列顺序执行,并且在执行前面添加了PoolMetrics的监测方法。

3.但是都带有返回值

4.唯一与execute不同的是它重新创建了一个handler,把传入进来的handler给了ContextInternal.dispatch执行

2.6 EventLoopContext

2.6.1 代码分析

public class EventLoopContext extends ContextBase {
  //都是调用的父类,所以注释写在父类里面
  EventLoopContext(VertxInternal vertx,
                   EventLoop eventLoop,
                   WorkerPool internalBlockingPool,
                   WorkerPool workerPool,
                   Deployment deployment,
                   CloseFuture closeFuture,
                   ClassLoader tccl) {
    super(vertx, eventLoop, internalBlockingPool, workerPool, deployment, closeFuture, tccl);
  }
 //----------------------runOnContext实际实现-----------------------------    
  /**
   * 直接EventLoop直接执行
   * @param ctx
   * @param action
   */
  @Override
  protected void runOnContext(ContextInternal ctx, Handler<Void> action) {
    try {
      nettyEventLoop().execute(() -> ctx.dispatch(action));
    } catch (RejectedExecutionException ignore) {
      // Pool is already shut down
    }
  }
  public EventLoop nettyEventLoop() {
    return eventLoop;
  }
 //---------------------------execute和emit实际实现----------------------------  
/**
   * 直接在eventloop里面进行执行
   * @param ctx
   * @param argument
   * @param task
   * @param <T>
   */
  @Override
  protected <T> void emit(ContextInternal ctx, T argument, Handler<T> task) {
    EventLoop eventLoop = nettyEventLoop();
    //如果是eventloop直接执行
    if (eventLoop.inEventLoop()) {
      ContextInternal prev = ctx.beginDispatch();
      try {
        task.handle(argument);
      } catch (Throwable t) {
        reportException(t);
      } finally {
        ctx.endDispatch(prev);
      }
    } else {
      //如果不是则直接调用eventloop线程执行
      eventLoop.execute(() -> emit(ctx, argument, task));
    }
  }

  /**
   * 还是在eventloop内部执行
   * <ul>
   *   <li>When the current thread is event-loop thread of this context the implementation will execute the {@code task} directly</li>
   *   <li>Otherwise the task will be scheduled on the event-loop thread for execution</li>
   * </ul>
   */
  @Override
  protected <T> void execute(ContextInternal ctx, T argument, Handler<T> task) {
    EventLoop eventLoop = nettyEventLoop();
    if (eventLoop.inEventLoop()) {
      task.handle(argument);
    } else {
      eventLoop.execute(() -> task.handle(argument));
    }
  }

  /**
   * 还是在eventloop内部执行
   * @param ctx
   * @param task
   * @param <T>
   */
  @Override
  protected <T> void execute(ContextInternal ctx, Runnable task) {
    EventLoop eventLoop = nettyEventLoop();
    if (eventLoop.inEventLoop()) {
      task.run();
    } else {
      eventLoop.execute(task);
    }
  }
}

2.6.2 总结

execute(): 进入EventLoop线程直接执行

runOnContext:直接也是进入EventLoop线程调用ContextInternal.dispatch执行

emit:使用EventLoop线程执行,自己加上了beginDispatch和end方法,实际是和ContextInternal的dispatch内部方法是一样的

2.7 总结

主要总结:

ContextBase内部默认实现了executeBlocking和executeBlockingInternal方法,用这个来执行队列任务。

EventLoopContext则是全部由EventLoop线程来进行执行

WorkerContext则主要是通过线程池进队列执行。

3. 使用方式

3.1 创建

那么它是在哪里使用的呢,它其实贯穿Vertx的始末,只要由任务执行,线程相关都有它。接下来我举例子。

首先它是在VertImpl里进行创建的

比如创建EventLoopContext的创建和WorkContext的创建

@Override
public EventLoopContext createEventLoopContext(Deployment deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl) {
  return new EventLoopContext(this, eventLoopGroup.next(), internalWorkerPool, workerPool != null ? workerPool : this.workerPool, deployment, closeFuture, disableTCCL ? null : tccl);
}

@Override
public EventLoopContext createEventLoopContext(EventLoop eventLoop, WorkerPool workerPool, ClassLoader tccl) {
  return new EventLoopContext(this, eventLoop, internalWorkerPool, workerPool != null ? workerPool : this.workerPool, null, closeFuture, disableTCCL ? tccl : null);
}

@Override
public EventLoopContext createEventLoopContext() {
  return createEventLoopContext(null, closeFuture, null, Thread.currentThread().getContextClassLoader());
}

@Override
public WorkerContext createWorkerContext(Deployment deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl) {
  return new WorkerContext(this, internalWorkerPool, workerPool != null ? workerPool : this.workerPool, deployment, closeFuture, disableTCCL ? null : tccl);
}

@Override
public WorkerContext createWorkerContext() {
  return createWorkerContext(null, closeFuture, null, Thread.currentThread().getContextClassLoader());
}

那么在哪里调用呢

3.2 调用获取

VertxImpl

先获取再调用

//比如先获取后创建
public ContextInternal getOrCreateContext() {
  ContextInternal ctx = getContext();
  if (ctx == null) {
    // We are running embedded - Create a context
    //如果为空则进行创建并且加入到stickyContext里面
    ctx = createEventLoopContext();
    stickyContext.set(new WeakReference<>(ctx));
  }
  return ctx;
}

DeploymentManager的doDeploy方法

就是再Vert.x调用deployVerticle进行部署的时候会调用

ContextBase context = (options.isWorker() ? vertx.createWorkerContext(deployment, closeFuture, workerPool, tccl) :

3.3 使用

使用的地方太多了,包括NetServer,NetClient等等等等,它内部的EventLoop就是用于Netty的客户端服务端收发信息所用,后续将Netserve和NetClient等会详细说明

4 额外说明

Context和线程其实没有强关联关系。

当部署的时候会创建一个新的Context。

当不是部署的时候获取Context会首先获取Thread以及ThreadLocal里的Context,如果没有的话则会创建一个EventLoopContext并且添加到ThreadLocal。但是这是会更改的,当当前的线程通过其他Context执行任务的时候就会将当前线程的Context进行替换,直到执行完任务才会将Context替换回去

问题

为什么要先介绍Context呢?

因为Vertx的Context贯穿始末,包括Server,部署,EventBus等等等等,都是会用到的,因为它负责管理线程,异步任务执行。

源码注释版本

我目前在源码上都加注释,后续如果有需要可以找我拿加了代码注释的Vert.x源码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/957766.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring MVC工作流程

SpringMVC 的执行流程如下。 用户通过浏览器发起一个 HTTP 请求&#xff0c;该请求会被 DispatcherServlet&#xff08;前端控制器&#xff09;拦截&#xff1b;DispatcherServlet 调用 HandlerMapping&#xff08;处理器映射器&#xff09;找到具体的处理器&#xff08;Handl…

LinuxUbuntu安装OpenWAF

Linux&Ubuntu安装OpenWAF 官方GitHub地址 介绍 OpenWAF&#xff08;Web Application Firewall&#xff09;是一个开源的Web应用防火墙&#xff0c;用于保护Web应用程序免受各种网络攻击。它通过与Web服务器集成&#xff0c;监控和过滤对Web应用程序的流量&#xff0c;识…

基于移动端的校园失物招领系统 微信小程序的设计与实现779m5

于校园失物招领系统功能所牵扯的数据都是通过失主进行校园失物招领系统等相关的数据信息内容、并且可以实现首页、个人中心、失主管理、物品类型管理、失物展示管理、失物认领管理、在线投诉管理、论坛交流、系统管理等功能可以通过系统进行分配&#xff0c;传统的手工作业模式…

WebDAV之π-Disk派盘 + notototo

notototo是一款功能丰富的笔记软件,提供了多种功能,包括载入PDF文件并进行批注和标记的能力。您可以使用Apple Pencil或手指在PDF文件上进行写作和绘图操作。 同时,notototo也提供了与团队合作的功能,您可以连接到服务器并与他人协作。此外,您还可以在notototo中进行绘图,…

Dolphin for Mac(Wii游戏模拟器)配置指南

Wii模拟器Dolphin Mac是款适合Mac电脑中的游戏玩家们使用的模拟器工具。Wii模拟器Dolphin Mac官方版支持直接运行游戏镜像文件&#xff0c;玩家可以将游戏ISO拷贝到某一个文件夹中统一进行管理。Wii模拟器Dolphin Mac除了键盘和鼠标外&#xff0c;还支持配合原版的Wii遥控器操作…

MySQL告警“Connection attributes of length 570 were truncated“

mysql的错误日志中看到如下报错"[Warning] Connection attributes of length 571 were truncated"。比如&#xff1a; 2023-09-01T08:37:49.87392408:00 9149015 [Warning] [MY-010288] [Server] Connection attributes of length 570 were truncated (76 bytes los…

ip route get ip地址 应用案例

应用场景 在做虚拟化实验用的虚拟机和实际的ECS云主机一般都会有多个网卡&#xff0c;网络的联通性是经常碰到的问题。比如在一个VM上有3个网卡&#xff0c;分别为ens160(和寄主机进行桥接的网卡10.0.0.128)、ens224&#xff08;连接仅主机网络10.0.0.0/24的网卡10.0.0.128&…

三维模型OBJ格式轻量化顶点压缩主要技术方法分析

三维模型OBJ格式轻量化顶点压缩主要技术方法分析 三维模型的OBJ格式轻量化中&#xff0c;顶点压缩是一项重要的技术方法&#xff0c;用于减小模型文件的大小。以下是关于三维模型OBJ格式轻量化顶点压缩的主要技术方法的分析&#xff1a; 1、顶点位置量化&#xff1a; 顶点位置…

直播平台源码弹性云托管技术:稳定直播与降低成本的利器

在当今的互联网时代&#xff0c;直播平台源码层出不穷&#xff0c;直播平台源码不仅可以让人们获取最新的资讯、查找资料等信息获取&#xff0c;还能让人们在其中观看短视频、直播、与其他人聊天等互动放松&#xff0c;直播平台源码的受欢迎与平台人数的增加使得人们在选择直播…

Python爬虫(十七)_糗事百科案例

糗事百科实例 爬取糗事百科段子&#xff0c;假设页面的URL是: http://www.qiushibaike.com/8hr/page/1 要求&#xff1a; 使用requests获取页面信息&#xff0c;用XPath/re做数据提取获取每个帖子里的用户头像连接、用户姓名、段子内容、点赞次数和评论次数保存到json文件内…

使用spring自带的发布订阅机制来实现消息发布订阅

背景 公司的项目以前代码里面有存在使用spring自带发布订阅的代码&#xff0c;因此稍微学习一下如何使用&#xff0c;并了解一下这种实现方式的优缺点。 优点 实现方便&#xff0c;代码方面基本只需要定义消息体和消费者&#xff0c;适用于小型应用程序。不依赖外部中间件&a…

JavaScript设计模式(二)——简单工厂模式、抽象工厂模式、建造者模式

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1f4c3;个人状态&#xff1a; 研发工程师&#xff0c;现效力于中国工业软件事业 &#x1f680;人生格言&#xff1a; 积跬步…

pdfh5在线预览pdf文件

前言 pc浏览器和ios的浏览器都可以直接在线显示pdf文件&#xff0c;但是android浏览器不能在线预览pdf文件&#xff0c;如何预览pdf文件&#xff1f; Github: https://github.com/gjTool/pdfh5 Gitee: https://gitee.com/gjTool/pdfh5 使用pdfh5预览pdf 编写预览页面 <…

Spring Framework 学习笔记1:基础

Spring Framework 学习笔记1&#xff1a;基础 1.简介 1.1.生态和发展史 关于 Spring 的生态和发展史&#xff0c;可以观看这个视频。 1.2.系统架构 关于 Spring 的系统架构&#xff0c;可以观看这个视频。 2.Ioc Spring 的核心概念是 Ioc &#xff08;Inversion Of Cont…

grafana8.3创建告警规则

1. 部署grafana的配置文件修改 因为要采用发送邮件的方式通知告警内容所以&#xff0c;在部署grafana时要先配置好SMTP / Emailing的内容&#xff1a; [smtp]enabled true # 开启smtphost smtp.mxhichina.com:465 #设置邮箱服务器地址user testtest.com #设置邮箱用户pas…

Flink SQL你用了吗?

分析&回答 Flink 1.1.0&#xff1a;第一次引入 SQL 模块&#xff0c;并且提供 TableAPI&#xff0c;当然&#xff0c;这时候的功能还非常有限。Flink 1.3.0&#xff1a;在 Streaming SQL 上支持了 Retractions&#xff0c;显著提高了 Streaming SQL 的易用性&#xff0c;使…

排序之选择排序

文章目录 前言一、直接选择排序1、直接选择排序基本思想2、直接选择排序代码实现3、直接选择排序的效率 二、堆排序1、堆排序2、堆排序的效率 前言 选择排序的基本思想就是每一次从待排序的数据元素中选出最小(或最大)的一个元素&#xff0c;存放在序列的起始位置&#xff0c;…

汇编--int指令

中断信息可以来自CPU的内部和外部&#xff0c; 当CPU的内部有需要处理的事情发生的时候&#xff0c;将产生需要马上处理的中断信息&#xff0c;引发中断过程。在http://t.csdn.cn/jihpG&#xff0c;我们讲解了中断过程和两种内中断的处理。 这一章中&#xff0c; 我们讲解另一种…

freemarker学习+集成springboot+导出word

目录 一 FreeMarker简介 二 集成springboot&#xff0c;实现案例导出 三 常见面试题总结 一 FreeMarker简介 FreeMarker 是一款 模板引擎&#xff1a; 即一种基于模板和要改变的数据&#xff0c; 并用来生成输出文本(HTML网页&#xff0c;电子邮件&#xff0c;配置文件&…

2024河南光伏展|河南储能展|河南国际太阳能光伏储能展览会

2024第四届中国&#xff08;郑州&#xff09;太阳能光伏及储能产业展览会 时间&#xff1a;2024年2月26-28日 地点&#xff1a;郑州.中原国际博览中心 河南国际太阳能光伏及储能产业展览会是一个盛大的行业聚会&#xff0c;旨在展示、交流、合作和创新。这个展览会将会是一个…