DolphinSchedule基于事件驱动的高性能并发编程

news2024/11/17 22:18:58

文章目录

  • 前言
  • 前置知识
    • 异步编程
    • 基于时间驱动的异步编程模式(EAP Event-based Asynchronous Pattern )
    • 实现EAP
  • DolphinSchedule结合Netty实现Master与Worker之间的高性能处理能力的设计
    • 方案设计
    • 代码实现
  • 总结

前言

研究DolphinSchedule的内因在于对调度系统并发性能的追求,本文重点介绍在对调度系统处理任务实例的并发性能改造工作中对EAP(Event-based Asynchronous Pattern 基于事件的异步编程模式)的探索工作。道阻且长,力行必远。本文仅是个人观点,如有不严谨之处请多多指正。欢迎大家交流讨论,+V:zhouwang930628。

前置知识

异步编程

首先请大家思考一个问题,哪些方式可以实现异步编程?
实现多线程的几种基本手段:继承Thread类,实现Runnable接口,使用FutureTask和Callable接口,使用线程池。
除了这些基本使用方法,我们又可以通过下面一些结合了代码设计模式与多线程手段实现的框架和服务来实现异步编程:

  1. 使用CompletableFuture异步框架
  2. 使用Spring ApplicationEvent 事件,事件继承ApplicationEvent,监听器继承ApplicationListener
  3. 使用Springboot的@EnableAsync和@Async注解
  4. 使用消息队列
    我们重点关注基于事件驱动的异步编程模式。

基于时间驱动的异步编程模式(EAP Event-based Asynchronous Pattern )

在应用层面它一般与非阻塞io结合来实现应用层面的异步编程;在业务层面,对于需要轮询,阻塞的场景,事件驱动模式可以很好的解决这类性能问题。但是它也存在一些问题,比如:流程不清晰,逻辑复杂,代码可读性弱,调试困难等。
使用事件驱动模式来追求并发性能的业务一般都有以下特征:

  1. 产品业务存在着若干主流程
  2. 主流程包含若干子流程
  3. 子流程执行存在一定的次序
  4. 子流程之间存在联系(相互调用)
  5. 子流程围绕着同一个上下文做操作

实现EAP

首先是涉及到的类定义:
EventType(事件类型,,枚举类型,与产品业务相关)
Event(事件接口,包含事件类型EventType属性,定义事件业务处理等方法)
EventHandler(事件处理器,实现事件Event接口,包含各类事件的实际业务)
EventDispacher(事件分发器,根据事件类型来获取事件监听器并触发事件处理方法)
EventListener (事件监听器,向分发器注册监听器并且包装事件处理器(handler)的方法)
EventDrivenException (事件异常处理,事件处理的同一异常处理类)
EventAnnotation EventListenerManager (事件注解和事件监听器管理器,通过注解(包含事件类型)完成对监听器的自动注册)
EventWork (事件轮询执行线程)
NettySever NettyServerHandler NettyServerClient NettyClientHandler (应用层异步非阻塞通信框架)
实现该框架的几种方式:
基本模式: Event EventType EventHandler EventDispacher
在这里插入图片描述
EventDispacher类初始化的时候启动处理固定现场池用来消费时间队列的事件数据。通过事件内的时间类型找到对应的时间处理器执行对应时间的实际业务。
EventDispacher类暴露出去时间发布方法方便事件发布。
EventDispacher类暴露出去事件处理器注册方法方便处理器注册。
升级模式: Event EventType EventHandler EventListener EventDispacher EventAnnotation EventListenerManager EventDrivenException EventWork
在这里插入图片描述
EventDispacher类初始化的时候启动处理固定现场池,创建EventWorker线程用来消费时间队列的事件数据,通过事件内的事件类型找到对应的事件监听器调用包装的事件处理器来执行对应事件的实际业务,并且通过EventDrivenException 来处理事件触发过程中的异常。
EventDispacher类暴露出去时间发布方法方便事件发布。
EventDispacher类暴露出去事件处理器注册方法给EventListenerManager 调用。
EventListenerManager初始化的时候扫描标有EventAnnotation注解的事件监听器类,然后实例化事件监听器,并按照注解中的事件类型注册到EventDispacher。
高级模式: Event EventType EventHandler EventListener EventDispacher EventAnnotation EventListenerManager EventDrivenException EventWork NettySever NettyServerClient
在这里插入图片描述
事件由各类事件生成逻辑和时间处理器完成创建并发布。
各类事件处理器向NettyClientHandler和NettyServerHandler注册,而他们分别与NettyServerClient和NettyServer绑定,他们负责在应用之间传输需要发布的事件信息并将接收到的事件信息交给对应的Handler处理。
NettyClientHandler和NettyServerHandler接收到NettyServerClient和NettyServer发送过来的事件处理请求,通过事件中包含的事件类型,找到对应的事件处理器完成对事件的执行。
各类事件处理器在负责自身事件业务处理的同时,还承担了下游事件的创建与发布工作。这样在应用之间的结合Netty的事件驱动编程框架就完成了构建。

DolphinSchedule结合Netty实现Master与Worker之间的高性能处理能力的设计

方案设计

DolphinSchedule中Master和Worker之间基于事件驱动的并发处理设计如下图,设计方案与上一章节的实现基于事件驱动的编程模型的第三种设计一致。实际代码所涉及到的事件处理器只涉及任务实例执行流程相关的代码,日志,告警等业务逻辑虽然也是在这套逻辑的基础之上,但是不在此处做讲解。
在这里插入图片描述
流程涉及类以及作用如下:
CommandType(Master与Work通信的命令类型,本文只涉及TASK_EXEUTE_REQUEST,TASK_KILL_REQUEST,TASK_EXECUTE_RUNNING,TASK_EXECUTE_RESPONSE,TASK_EXECUTE_RUNNING_ACK,TASK_EXECUTE_RESPONSE_ACK,TASK_KILL_RESPONSE这些类型,其他的不展开)

NettyRequestProcessor (事件处理器接口,定义了通过Channel和Commond来执行实际事件业务的方法)

TaskExecuteProcessor (任务实例执行的事件处理器,注册在WorkerServer的nettyserver上面,由TASK_EXEUTE_REQUEST命令类型触发,主要业务是触发TASK_EXECUTE_RUNNING命令,执行任务实例业务,触发TASK_EXECUTE_RESPONSE命令)

TaskExecuteRunningProcessor (实例正在运行事件处理器,注册在MasterServer的nettyServer上面,由TASK_EXECUTE_RUNNING命令类型触发,主要业务是更新任务实例状态,触发TASK_EXECUTE_RUNNING_ACK命令)

TaskExecuteRunningAckProcessor (监控实例正在运行事件的事件处理器,注册在TaskCallbackService包装的masterserver通信的nettyremotingclient上面,由TASK_EXECUTE_RUNNING_ACK触发,主要业务是刷新清理本地缓存信息)

TaskExecuteResponseProcessor (实例运行结果事件处理器,注册在MasterServer的nettyServer上面,由TASK_EXECUTE_RESPONSE命令类型触发,主要业务是更新任务实例状态,触发TASK_EXECUTE_RESPONSE_ACK命令)

TaskExecuteResponseAckProcessor (监控实例运行结果事件的事件处理器,注册在TaskCallbackService包装的master的nettyremotingclient上面,由TASK_EXECUTE_RESPONSE_ACK触发,主要业务是刷新清理本地缓存信息)

TaskKillProcessor (实例杀死事件处理器,注册在WorkerServer的nettyserver上面,由TASK_KILL_REQUEST命令类型触发,主要业务是杀死实例进程,触发TASK_KILL_RESPONSE命令)

TaskKillResponseProcessor (实例杀死处理结果的事件处理器,注册在MasterServer的nettyServer上面 上面,由TASK_KILL_RESPONSE触发,主要业务是日志记录处理结果)

NettyRemotingClient (netty的Bootstrap代理类,负责与Nettyserver双向通信,提供与netty服务端通信的方法,该方法中会使用rpc通信参数调用注册在NettyClientHandler中的事件处理器的事件业务,同时还包装了NettyClientHandler类中通过命令类型注册事件处理器的方法)

NettyClientHandler (netty客户端事件处理器的包装类,继承了netty的ChannelInboundHandlerAdapter类实现了channelRead方法,通过nettyserver传递的命令类型获取事件处理器来执行对应事件业务,同时还提供了通过命令类型注册客户端事件处理器的方法供NettyRemotingClient 的registerProcessor方法调用)

NettyExecutorManager(执行器管理器类,包装了NettyRemotingClient 与nettyServer通信的send方法,同时提供任务实例分发,超时操作等业务方法给外部调用)

MasterSchedulerService(主节点的任务调度业务类,负责生成任务实例,发送TASK_EXEUTE_REQUEST命令,负责实例分发(实例分发是创建DISPATCH类型的任务事件,并且将相关事件上线文参数推进任务分发队列TaskPriorityQueue)和实例状态的变更工作)

TaskPriorityQueueConsumer(消费TaskPriorityQueue队列的数据,通过NettyExecutorManager包含的NettyRemotingClient来与WokerServer包含的NettyRemotingServer进行交互,完成任务实例分发事件的处理)

NettyRemotingServer (Netty的ServerBootstrap代理类,负责与nettyclient双向通信。包装了NettyServerHandler 通过命令类型注册服务端事件处理器的方法)

NettyServerHandler (netty服务端事件处理器的包装类,继承了netty的ChannelInboundHandlerAdapter类实现了channelRead方法,通过nettyclient传递的命令类型获取事件处理器来执行对应事件业务,同时还提供了通过命令类型注册客户端事件处理器的方法供NettyRemotingServer 的registerProcessor方法调用)

TaskCallbackService (任务实例事件回调服务类,包装了与MasterServer通信的NettyRemotingClient,负责与Master回调任务实例相关的事件,比如 TASK_EXECUTE_RUNNING_ACK,TASK_EXECUTE_RESPONSE_ACK等)

上图包含了任务实例的分发到执行完成流程和任务实例的杀死流程。整个调用流程如下:
实例分发执行流程

  1. MasterServer和WorkServer启动的时候都会启动一个内置的netty服务端和客户端,并且将与各自绑定的事件处理器在初始化方法中注册到对应的NettyServerHandler 与NettyClientHandler 上面。Master的netty服务端被MasterServer本身持有,与Worker通信的netty客户端由NettyExecutorManager持有。Worker的netty服务端被WorkerServer本身持有,与Master通信的netty客户端由TaskCallbackService 持有。
  2. 对于任务实例执行流程,首先任务调度器会生成好任务对应的实例信息和TASK_EXEUTE_REQUEST事件并推送到任务实例的分发队列中(相关业务不在此展开),TaskPriorityQueueConsumer会消费上述队列的数据通NettyExecutorManager包含的NettyRemotingClient与WorkerServer通信,此过程包含负载均衡,nettyclient初始化,channel缓存,与work通信等步骤。
  3. WorkerServer在接收到TASK_EXEUTE_REQUEST事件请求之后,会找到注册在NettyServerHandler中与TASK_EXEUTE_REQUEST对应的事件处理器TaskExecuteProcessor,并执行该事件的业务。TaskExecuteProcessor的主要工作有三部,第一步,生成TASK_EXECUTE_RUNNING命令通过TaskCallbackService 回传给MasterServer,第二部,执行任务实例业务(各类任务实例的具体任务执行逻辑不在此处展开),第三部,生成TASK_EXECUTE_RESPONSE命令通过回TaskCallbackService 传给MasterServer。
  4. MasterServer接收到TASK_EXECUTE_RUNNING命令之后,会从NettyServerHandler 中找到注册在上面与TASK_EXECUTE_RUNNING对应的时间处理类TaskExecuteRunningProcessor,并执行该事件的业务。TaskExecuteRunningProcessor的主要工作分三步,第一步,创建RUNNING类型的TaskEvent(另外的时间处理设计,不展开讲解),第二步,更新任务实例状态,第三步,生成TASK_EXECUTE_RUNNING_ACK命令,并通过缓存的Channel回传给绑定在TaskCallbackService中的NettyRemotingClient。
  5. TaskCallbackService接收到TASK_EXECUTE_RUNNING_ACK命令之后,会从NettyClientHandler中找到与TASK_EXECUTE_RUNNING_ACK对应的事件处理器TaskExecuteRunningAckProcessor。TaskExecuteRunningAckProcessor的主要工作就是移除本地的与实例运行相关的缓存信息。
  6. MasterServer接收到TASK_EXECUTE_RESPONSE命令之后,会从NettyServerHandler 中找到注册在上面与TaskExecuteResponseProcessor 对应的时间处理类TaskExecuteRunningProcessor,并执行该事件的业务。TaskExecuteResponseProcessor的主要业务主要分三步,第一步,创建Result类型的TaskEvent(另外的时间处理设计,不展开讲解),第二步,更新任务实例信息,第三步,生成TASK_EXECUTE_RESPONSE_ACK命令并通过缓存的Channel回传给绑定在TaskCallbackService中的NettyRemotingClient。
  7. TaskCallbackService接收到TASK_EXECUTE_RESPONSE_ACK命令之后,会从NettyClientHandler中找到与TASK_EXECUTE_RESPONSE_ACK对应的事件处理器TaskExecuteResponseAckProcessor 。TaskExecuteResponseAckProcessor 的主要工作就是移除本地的与实例运行相关的缓存信息和channel信息。

实例停止和超时终止流程

  1. 通过api服务或者容错业务生成 TASK_KILL_REQUEST命令,然后通过NettyExecutorManager的send方法与WorkerServer通信。
  2. WorkerServer在接收到TASK_KILL_REQUEST事件请求之后,会找到注册在NettyServerHandler中与TASK_KILL_REQUEST对应的事件处理器TaskKillProcessor ,并执行该事件的业务。TaskKillProcessor的主要工作有两步,第一,杀死进程,第二步,生成TASK_KILL_RESPONSE命令,通过通过TaskCallbackService 回传给MasterServer。
  3. MasterServer接收到TASK_KILL_RESPONSE命令之后,会从NettyServerHandler 中找到注册在上面与TASK_KILL_RESPONSE对应的时间处理类TaskKillResponseProcessor ,并执行该事件的业务。TaskKillResponseProcessor主要工作就是日志记录实例杀死的结果。

代码实现

代码流程只展示Master与worker通信,然后worker回调Master的单一流程,不包含上述流程的全部过程,不能文章过于冗长,有兴趣可自行研读。

  1. MasterServer启动并注册与之绑定的事件处理器taskExecuteResponseProcessor,taskExecuteRunningProcessor,taskKillResponseProcessor。
@PostConstruct
   public void run() throws SchedulerException {
       // init remoting server
       NettyServerConfig serverConfig = new NettyServerConfig();
       serverConfig.setListenPort(masterConfig.getListenPort());
       this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
       this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
       this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
       this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
       this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
       this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
       this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
       this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
 
       // logger server
       this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
       this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
       this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
       this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
 
       this.nettyRemotingServer.start();
 
       // install task plugin
       this.taskPluginManager.installPlugin();
 
       // self tolerant
       this.masterRegistryClient.init();
       this.masterRegistryClient.start();
       this.masterRegistryClient.setRegistryStoppable(this);
 
       this.masterSchedulerService.init();
       this.masterSchedulerService.start();
 
       this.eventExecuteService.start();
       this.failoverExecuteThread.start();
 
       this.scheduler.start();
 
       Runtime.getRuntime().addShutdownHook(new Thread(() -> {
           if (Stopper.isRunning()) {
               close("shutdownHook");
           }
       }));
   }
 
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
       this.registerProcessor(commandType, processor, null);
   }
 
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
       this.serverHandler.registerProcessor(commandType, processor, executor);
   }
  1. WorkerServer启动并注册与之绑定的事件处理器taskExecuteProcessor,taskExecuteRunningAckProcessor,taskExecuteResponseAckProcessor,taskKillProcessor。
@PostConstruct
    public void run() {
        // init remoting server
        NettyServerConfig serverConfig = new NettyServerConfig();
        serverConfig.setListenPort(workerConfig.getListenPort());
        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
 
        // logger server
        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
 
        this.nettyRemotingServer.start();
 
        // install task plugin
        this.taskPluginManager.installPlugin();
 
        // worker registry
        try {
            this.workerRegistryClient.registry();
            this.workerRegistryClient.setRegistryStoppable(this);
            Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
 
            this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }
 
        // task execute manager
        this.workerManagerThread.start();
 
        // retry report task status
        this.retryReportTaskStatusThread.start();
 
        /*
         * registry hooks, which are called before the process exits
         */
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (Stopper.isRunning()) {
                close("shutdownHook");
            }
        }));
    }
 
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
        this.registerProcessor(commandType, processor, null);
    }
 
 public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
        this.serverHandler.registerProcessor(commandType, processor, executor);
    }
  1. NettyExecutorManager初始化的初始化NettyRemotingClient
public NettyExecutorManager() {
        final NettyClientConfig clientConfig = new NettyClientConfig();
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
    }
  1. TaskCallbackService初始化的时候创建NettyRemotingClient,并且注册与之绑定的事件处理器taskExecuteRunningProcessor,taskExecuteResponseAckProcessor

public TaskCallbackService() {
       final NettyClientConfig clientConfig = new NettyClientConfig();
       this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
       this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
       this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
   }
  1. TaskPriorityQueueConsumer消费定时器产生的实例分发队列数据,最后会调用executorManager.execute(context)实际执行与Worker的通信逻辑。
@Override
    public void run() {
        int fetchTaskNum = masterConfig.getDispatchTaskNumber();
        while (Stopper.isRunning()) {
            try {
                List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
 
                if (!failedDispatchTasks.isEmpty()) {
                    for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
                        taskPriorityQueue.put(dispatchFailedTask);
                    }
                    // If there are tasks in a cycle that cannot find the worker group,
                    // sleep for 1 second
                    if (taskPriorityQueue.size() <= failedDispatchTasks.size()) {
                        TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
                    }
                }
            } catch (Exception e) {
                logger.error("dispatcher task error", e);
            }
        }
    }
 
private List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
        List<TaskPriority> failedDispatchTasks = Collections.synchronizedList(new ArrayList<>());
        CountDownLatch latch = new CountDownLatch(fetchTaskNum);
 
        for (int i = 0; i < fetchTaskNum; i++) {
            TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
            if (Objects.isNull(taskPriority)) {
                latch.countDown();
                continue;
            }
 
            consumerThreadPoolExecutor.submit(() -> {
                boolean dispatchResult = this.dispatchTask(taskPriority);
                if (!dispatchResult) {
                    failedDispatchTasks.add(taskPriority);
                }
                latch.countDown();
            });
        }
 
        latch.await();
 
        return failedDispatchTasks;
    }
 
protected boolean dispatchTask(TaskPriority taskPriority) {
        boolean result = false;
        try {
            TaskExecutionContext context = taskPriority.getTaskExecutionContext();
            ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup());
 
            if (isTaskNeedToCheck(taskPriority)) {
                if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
                    // when task finish, ignore this task, there is no need to dispatch anymore
                    return true;
                }
            }
 
            result = dispatcher.dispatch(executionContext);
 
            if (result) {
                addDispatchEvent(context, executionContext);
            }
        } catch (RuntimeException e) {
            logger.error("dispatch error: ", e);
        } catch (ExecuteException e) {
            logger.error("dispatch error: {}", e.getMessage());
        }
        return result;
    }
 
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
        /**
         * get executor manager
         */
        ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
        if (executorManager == null) {
            throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
        }
 
        /**
         * host select
         */
 
        Host host = hostManager.select(context);
        if (StringUtils.isEmpty(host.getAddress())) {
            throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker, "
                            + "current task needs worker group %s to execute",
                    context.getCommand(),context.getWorkerGroup()));
        }
        context.setHost(host);
        executorManager.beforeExecute(context);
        try {
            /**
             * task execute
             */
            return executorManager.execute(context);
        } finally {
            executorManager.afterExecute(context);
        }
    }
  1. Master与Worker通信,最终会通过NettyExecutorManager中的NettyRemotingClient的send方法来与Worker的nettyServer通信。
@Override
    public Boolean execute(ExecutionContext context) throws ExecuteException {
 
        /**
         *  all nodes
         */
        Set<String> allNodes = getAllNodes(context);
 
        /**
         * fail nodes
         */
        Set<String> failNodeSet = new HashSet<>();
 
        /**
         *  build command accord executeContext
         */
        Command command = context.getCommand();
 
        /**
         * execute task host
         */
        Host host = context.getHost();
        boolean success = false;
        while (!success) {
            try {
                doExecute(host, command);
                success = true;
                context.setHost(host);
            } catch (ExecuteException ex) {
                logger.error(String.format("execute command : %s error", command), ex);
                try {
                    failNodeSet.add(host.getAddress());
                    Set<String> tmpAllIps = new HashSet<>(allNodes);
                    Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
                    if (remained != null && remained.size() > 0) {
                        host = Host.of(remained.iterator().next());
                        logger.error("retry execute command : {} host : {}", command, host);
                    } else {
                        throw new ExecuteException("fail after try all nodes");
                    }
                } catch (Throwable t) {
                    throw new ExecuteException("fail after try all nodes");
                }
            }
        }
 
        return success;
    }
 
public void doExecute(final Host host, final Command command) throws ExecuteException {
        /**
         * retry count,default retry 3
         */
        int retryCount = 3;
        boolean success = false;
        do {
            try {
                nettyRemotingClient.send(host, command);
                success = true;
            } catch (Exception ex) {
                logger.error(String.format("send command : %s to %s error", command, host), ex);
                retryCount--;
                ThreadUtils.sleep(100);
            }
        } while (retryCount >= 0 && !success);
 
        if (!success) {
            throw new ExecuteException(String.format("send command : %s to %s error", command, host));
        }
    }
 
  public void send(final Host host, final Command command) throws RemotingException {
        Channel channel = getChannel(host);
        if (channel == null) {
            throw new RemotingException(String.format("connect to : %s fail", host));
        }
        try {
            ChannelFuture future = channel.writeAndFlush(command).await();
            if (future.isSuccess()) {
                logger.debug("send command : {} , to : {} successfully.", command, host.getAddress());
            } else {
                String msg = String.format("send command : %s , to :%s failed", command, host.getAddress());
                logger.error(msg, future.cause());
                throw new RemotingException(msg);
            }
        } catch (Exception e) {
            logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
            throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
        }
    }
  1. WorkerServer接收到NettyExecutorManager发送过来的命令,通过NettyServerHandler或去注册在上面的事件处理器TaskExecuteProcessor ,通过事件梳理器的process方法执行实际的事件业务。
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        processReceived(ctx.channel(), (Command) msg);
    }
 
private void processReceived(final Channel channel, final Command msg) {
        final CommandType commandType = msg.getType();
        if (CommandType.HEART_BEAT.equals(commandType)) {
            if (logger.isDebugEnabled()) {
                logger.debug("server receive heart beat from: host: {}", ChannelUtils.getRemoteAddress(channel));
            }
            return;
        }
        final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
        if (pair != null) {
            Runnable r = () -> {
                try {
                    pair.getLeft().process(channel, msg);
                } catch (Exception ex) {
                    logger.error("process msg {} error", msg, ex);
                }
            };
            try {
                pair.getRight().submit(r);
            } catch (RejectedExecutionException e) {
                logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
            }
        } else {
            logger.warn("commandType {} not support", commandType);
        }
    }
  1. TaskExecuteProcessor的process逻辑分三步,第一步,生成TASK_EXECUTE_RUNNING命令通过TaskCallbackService 回传给MasterServer,第二部,执行任务实例业务(各类任务实例的具体任务执行逻辑不在此处展开),第三部,生成TASK_EXECUTE_RESPONSE命令通过回TaskCallbackService 传给MasterServer。此过程都由任务执行线程TaskExecuteThread完成。
@Override
    public void process(Channel channel, Command command) {
        Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
                String.format("invalid command type : %s", command.getType()));
 
        TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(
                command.getBody(), TaskExecuteRequestCommand.class);
 
        if (taskRequestCommand == null) {
            logger.error("task execute request command is null");
            return;
        }
        logger.info("task execute request command : {}", taskRequestCommand);
 
        String contextJson = taskRequestCommand.getTaskExecutionContext();
        TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
 
        if (taskExecutionContext == null) {
            logger.error("task execution context is null");
            return;
        }
 
        // set cache, it will be used when kill task
        TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
 
        // todo custom logger
 
        taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
        taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
 
        if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
            if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
                OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
            }
 
            // check if the OS user exists
            if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
                logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
                        taskExecutionContext.getTenantCode(), taskExecutionContext.getTaskInstanceId());
                TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
                taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
                taskExecutionContext.setEndTime(new Date());
                taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
                return;
            }
 
            // local execute path
            String execLocalPath = getExecLocalPath(taskExecutionContext);
            logger.info("task instance local execute path : {}", execLocalPath);
            taskExecutionContext.setExecutePath(execLocalPath);
 
            try {
                FileUtils.createWorkDirIfAbsent(execLocalPath);
            } catch (Throwable ex) {
                logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", execLocalPath, taskExecutionContext.getTaskInstanceId());
                logger.error("create executeLocalPath fail", ex);
                TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
                taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
                taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
                return;
            }
        }
 
        taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
                new NettyRemoteChannel(channel, command.getOpaque()));
 
        // delay task process
        long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
        if (remainTime > 0) {
            logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
            taskExecutionContext.setStartTime(null);
            taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
        }
 
        // submit task to manager
        boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager));
        if (!offer) {
            logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
                    workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId());
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
            taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
        }
    }
 
 @Override
    public void run() {
        if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
            taskExecutionContext.setStartTime(new Date());
            taskExecutionContext.setEndTime(new Date());
            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
            taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
            return;
        }
 
        try {
            logger.info("script path : {}", taskExecutionContext.getExecutePath());
            if (taskExecutionContext.getStartTime() == null) {
                taskExecutionContext.setStartTime(new Date());
            }
            logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
 
            // callback task execute running
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
            taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
 
            // copy hdfs/minio file to local
            List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
            if (!fileDownloads.isEmpty()){
                downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
            }
 
            taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
            taskExecutionContext.setDefinedParams(getGlobalParamsMap());
 
            taskExecutionContext.setTaskAppId(String.format("%s_%s",
                    taskExecutionContext.getProcessInstanceId(),
                    taskExecutionContext.getTaskInstanceId()));
 
            preBuildBusinessParams();
 
            TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
            if (null == taskChannel) {
                throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
            }
            String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
                    taskExecutionContext.getProcessDefineCode(),
                    taskExecutionContext.getProcessDefineVersion(),
                    taskExecutionContext.getProcessInstanceId(),
                    taskExecutionContext.getTaskInstanceId());
            taskExecutionContext.setTaskLogName(taskLogName);
 
            // set the name of the current thread
            Thread.currentThread().setName(taskLogName);
 
            task = taskChannel.createTask(taskExecutionContext);
 
            // task init
            this.task.init();
 
            //init varPool
            this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
 
            // task handle
            this.task.handle();
 
            // task result process
            if (this.task.getNeedAlert()) {
                sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
            }
 
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));
            taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
            taskExecutionContext.setProcessId(this.task.getProcessId());
            taskExecutionContext.setAppIds(this.task.getAppIds());
            taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
            logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
        } catch (Throwable e) {
            logger.error("task scheduler failure", e);
            kill();
            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
            taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
            taskExecutionContext.setProcessId(this.task.getProcessId());
            taskExecutionContext.setAppIds(this.task.getAppIds());
        } finally {
            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
            taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
            clearTaskExecPath();
        }
    }
  1. taskCallbackService的命令回调Master逻辑如下,以TASK_EXECUTE_RUNNING为例。
public void sendTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
        TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
        // add response cache
        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RUNNING);
        send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
    }
 
public void send(int taskInstanceId, Command command) {
        NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
        if (nettyRemoteChannel != null) {
            nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {
 
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        // remove(taskInstanceId);
                        return;
                    }
                }
            });
        }
    }
  1. Master在接收到taskCallbackService的回调命令TASK_EXECUTE_RUNNING之后,会通过NettyServerHandler或去注册在上面的事件处理器TaskExecuteRunningProcessor,通过事件梳理器的process方法执行实际的事件业务。
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        processReceived(ctx.channel(), (Command) msg);
    }
private void processReceived(final Channel channel, final Command msg) {
        final CommandType commandType = msg.getType();
        if (CommandType.HEART_BEAT.equals(commandType)) {
            if (logger.isDebugEnabled()) {
                logger.debug("server receive heart beat from: host: {}", ChannelUtils.getRemoteAddress(channel));
            }
            return;
        }
        final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
        if (pair != null) {
            Runnable r = () -> {
                try {
                    pair.getLeft().process(channel, msg);
                } catch (Exception ex) {
                    logger.error("process msg {} error", msg, ex);
                }
            };
            try {
                pair.getRight().submit(r);
            } catch (RejectedExecutionException e) {
                logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
            }
        } else {
            logger.warn("commandType {} not support", commandType);
        }
    }
 
 @Override
    public void process(Channel channel, Command command) {
        Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == command.getType(), String.format("invalid command type : %s", command.getType()));
        TaskExecuteRunningCommand taskExecuteRunningCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
        logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningCommand);
 
        TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
        taskEventService.addEvent(taskEvent);
    }
 public void addEvent(TaskEvent taskEvent) {
        taskExecuteThreadPool.submitTaskEvent(taskEvent);
    }
public void submitTaskEvent(TaskEvent taskEvent) {
        if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
            logger.warn("workflowExecuteThread is null, event: {}", taskEvent);
            return;
        }
        if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
            TaskExecuteThread taskExecuteThread = new TaskExecuteThread(
                    taskEvent.getProcessInstanceId(),
                    processService, workflowExecuteThreadPool,
                    processInstanceExecCacheManager,
                    dataQualityResultOperator);
            taskExecuteThreadMap.put(taskEvent.getProcessInstanceId(), taskExecuteThread);
        }
        TaskExecuteThread taskExecuteThread = taskExecuteThreadMap.get(taskEvent.getProcessInstanceId());
        if (taskExecuteThread != null) {
            taskExecuteThread.addEvent(taskEvent);
        }
    }
public boolean addEvent(TaskEvent event) {
        if (event.getProcessInstanceId() != this.processInstanceId) {
            logger.warn("event would be abounded, task instance id:{}, process instance id:{}, this.processInstanceId:{}",
                    event.getTaskInstanceId(), event.getProcessInstanceId(), this.processInstanceId);
            return false;
        }
        return this.events.add(event);
    }
  1. 到这里创建的TaskEvent会被TaskExecuteThreadPool去消费,最终的RUNNING事件会完成实例状态的更新和下一个事件TASK_EXECUTE_RUNNING_ACK的回传触发。
 class TaskEventHandlerThread extends Thread {
 
        @Override
        public void run() {
            logger.info("event handler thread started");
            while (Stopper.isRunning()) {
                try {
                    taskExecuteThreadPool.eventHandler();
                    TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    logger.error("event handler thread error", e);
                }
            }
        }
    }
public void eventHandler() {
        for (TaskExecuteThread taskExecuteThread: taskExecuteThreadMap.values()) {
            executeEvent(taskExecuteThread);
        }
    }
 
 public void executeEvent(TaskExecuteThread taskExecuteThread) {
        if (taskExecuteThread.eventSize() == 0) {
            return;
        }
        if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) {
            return;
        }
        multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread);
        ListenableFuture future = this.submitListenable(taskExecuteThread::run);
        future.addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                logger.error("handle event {} failed: {}", taskExecuteThread.getProcessInstanceId(), ex);
                if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
                    taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
                    logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
                }
                multiThreadFilterMap.remove(taskExecuteThread.getKey());
            }
 
            @Override
            public void onSuccess(Object result) {
                logger.info("persist events {} succeeded.", taskExecuteThread.getProcessInstanceId());
                if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
                    taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
                    logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
                }
                multiThreadFilterMap.remove(taskExecuteThread.getKey());
            }
        });
    }
 
  public void run() {
        while (!this.events.isEmpty()) {
            TaskEvent event = this.events.peek();
            try {
                persist(event);
            } catch (Exception e) {
                logger.error("persist error, event:{}, error: {}", event, e);
            } finally {
                this.events.remove(event);
            }
        }
    }
 
private void persist(TaskEvent taskEvent) {
        Event event = taskEvent.getEvent();
        int taskInstanceId = taskEvent.getTaskInstanceId();
        int processInstanceId = taskEvent.getProcessInstanceId();
 
        TaskInstance taskInstance;
        WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
        if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) {
            taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
        } else {
            taskInstance = processService.findTaskInstanceById(taskInstanceId);
        }
 
        switch (event) {
            case DISPATCH:
                handleDispatchEvent(taskEvent, taskInstance);
                // dispatch event do not need to submit state event
                return;
            case DELAY:
            case RUNNING:
                handleRunningEvent(taskEvent, taskInstance);
                break;
            case RESULT:
                handleResultEvent(taskEvent, taskInstance);
                break;
            default:
                throw new IllegalArgumentException("invalid event type : " + event);
        }
 
        StateEvent stateEvent = new StateEvent();
        stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
        stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
        stateEvent.setExecutionStatus(taskEvent.getState());
        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
        workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }
  private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
        Channel channel = taskEvent.getChannel();
        try {
            if (taskInstance != null) {
                if (taskInstance.getState().typeIsFinished()) {
                    logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
                } else {
                    taskInstance.setState(taskEvent.getState());
                    taskInstance.setStartTime(taskEvent.getStartTime());
                    taskInstance.setHost(taskEvent.getWorkerAddress());
                    taskInstance.setLogPath(taskEvent.getLogPath());
                    taskInstance.setExecutePath(taskEvent.getExecutePath());
                    taskInstance.setPid(taskEvent.getProcessId());
                    taskInstance.setAppLink(taskEvent.getAppIds());
                    processService.saveTaskInstance(taskInstance);
                }
            }
            // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
            channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
        } catch (Exception e) {
            logger.error("worker ack master error", e);
            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
            channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
        }
    }

中间涉及到的TaskEvent又是另一套基于事件驱动编程模式的设计,与实际产品业务强相关,不在此处展开。

到此Master与worker通信,然后回调Master的全部流程就介绍完毕。通过代码我们可以看出,DolphinSchedule的在以事件驱动的任务实例调度上设计很优秀,各个业务相互独立,各个子流程松耦合,交互与触发都是异步处理,能够提供很高的并发处理能力。

总结

对于调度系统存在的任务与实例的两条主线迫切需要采用基于事件驱动的模式来实现高性能的并发处理能力。

从任务的扫描,到实例产生,实例触发参数生成,实例触发,实例执行,实例状态流转,完全可以拆分成不同的事件,相关业务通过代码解耦,独立成对应的事件处理器。通过代表实际业务的事件类型定义来驱动事件处理器的运行。

调度管理系统与调度执行器之间的通信与Netty服务端与客户端绑定,完成应用服务之间的基于事件驱动的高并发的任务处理能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/338848.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

内存访问局部性特征

分享一道360的C语言笔试题。x是一个行列均为1000的二维数组&#xff0c;下面代码运行效率最高的是哪个&#xff1f; 二维数组大家都很熟悉&#xff0c;正常人遍历二维数组都是一行一行来的&#xff0c;为什么很少有人按列去遍历&#xff1f; 这道笔试题其实考察的就是遍历效率…

#车载基础软件——AUTOSAR AP技术形态

车载基础软件——AUTOSAR AP技术形态 我是穿拖鞋的汉子! 今天是2023年2月11日,时间好快,疫情解封已好几个月,生活节奏也在逐渐恢复到三年前的节奏。可能是感觉疫情与自己距离变远了,大家也开始慢慢的不再恐惧! 老规矩分享一段喜欢的文字,避免自己成为高知识低文化的工…

【安全】nginx反向代理+负载均衡上传webshell

目录 一、负载均衡反向代理下上传webshell Ⅰ、环境搭建 ①下载蚁剑&#xff0c;于github获取官方版&#xff1a; ②下载docker&docker-compose ③结合前面启动环境 ④验证 负载均衡下webshell上传 一、负载均衡反向代理下上传webshell 什么是反向代理&#xff1f; 通常的代…

大数据框架之Hadoop:入门(三)Hadoop运行环境搭建(开发重点)

3.1虚拟机环境准备 详见&#xff1a;yiluohan1234/vagrant_bigdata_cluster: 利用virtualbox快速搭建大数据测试环境 (github.com) 单纯只是安装虚拟机的话&#xff0c;注释掉40到115行。按照教程安装即可。 在 /opt 目录下创建 module、 software 文件夹 [roothdp101 ~]# m…

一、Java并发编程之线程、synchronized

黑马课程 文章目录1. Java线程1.1 创建和运行线程方法一&#xff1a;Thread方法二&#xff1a;Runnable&#xff08;推荐&#xff09;lambda精简Thread和runnable原理方法三&#xff1a;FutureTask配合Thread1.2 查看进程和线程的方法1.3 线程运行原理栈与栈帧线程上下文切换1.…

1. SpringMVC 简介

文章目录1. SpringMVC 概述2. SpringMVC 入门案例2.1 入门案例2.2 入门案例工作流程3. bean 加载控制4. PostMan 工具1. SpringMVC 概述 SpringMVC 与 Servlet 功能等同&#xff0c;均属于 Web 层开发技术。SpringMVC 是 Spring 框架的一部分。 对于 SpringMVC&#xff0c;主…

Python导入模块的3种方式

很多初学者经常遇到这样的问题&#xff0c;即自定义 Python 模板后&#xff0c;在其它文件中用 import&#xff08;或 from...import&#xff09; 语句引入该文件时&#xff0c;Python 解释器同时如下错误&#xff1a;ModuleNotFoundError: No module named 模块名意思是 Pytho…

45.在ROS中实现global planner(1)

前文move_base介绍&#xff08;4&#xff09;简单介绍move_base的全局路径规划配置&#xff0c;接下来我们自己实现一个全局的路径规划 1. move_base规划配置 ROS1的move_base可以配置选取不同的global planner和local planner&#xff0c; 默认move_base.cpp#L70中可以看到是…

Vue3电商项目实战-分类模块1【01-顶级类目-面包屑组件-初级、02-顶级类目-面包屑组件-高级】

文章目录01-顶级类目-面包屑组件-初级02-顶级类目-面包屑组件-高级01-顶级类目-面包屑组件-初级 目的&#xff1a; 封装一个简易的面包屑组件&#xff0c;适用于两级场景。 大致步骤&#xff1a; 准备静态的 xtx-bread.vue 组件定义 props 暴露 parentPath parentName 属性&am…

[oeasy]python0081_ANSI序列由来_终端机_VT100_DEC_VT选项_终端控制序列

更多颜色 回忆上次内容 上次 首先了解了RGB颜色设置可以把一些抽象的色彩名字 落实到具体的 RGB颜色 计算机所做的一切 其实就是量化、编码把生活的一切都进行数字化 标准 是ANSI制定的 这个ANSI 又是 怎么来的 呢&#xff1f;&#xff1f;&#x1f914; 由来 ANSI 听起…

【c++设计模式】——模板方法模式

模板方法模式的定义 定义一个操作中的算法对象的骨架&#xff08;稳定&#xff09;&#xff0c;而将一些步骤延迟到子类&#xff08;定义一个虚函数&#xff0c;让子类去实现&#xff09;&#xff0c;template method使得子类可以不改变&#xff08;复用&#xff09;一个算法结…

can协议介绍

目录 1 can协议介绍 1.1can协议 1.2 CAN协议特点 2.CAN FD 2.1 CAN FD协议简介 2.2 CAN FD协议特点 3.LIN 3.1 LIN总线简介 3.2 LIN总线特点 4. FlexRay 4.1 FlexRay简介 4.2 FlexRay特点 5. MOST 6.Ethernet 7 总结&#xff1a; 1 can协议介绍 1.1can协议 CAN…

Linux---Linux是什么

Linux 便成立的核心网站&#xff1a; http://www.kernel.org Linux是什么 Linux 就是一套操作系统 Linux 就是核心与系统呼叫接口那两层 软件移植&#xff1a;如果能够参考硬件的功能函数并据以修改你的操作系统程序代码&#xff0c; 那经过改版后的操作系统就能够在另一个硬…

Spring Boot 整合定时任务完成 从0 到1

Java 定时任务学习 定时任务概述 > 定时任务的应用场景非常广泛, 如果说 我们想要在某时某地去尝试的做某件事 就需要用到定时任务来通知我们 &#xff0c;大家可以看下面例子 如果需要明天 早起&#xff0c;哪我们一般会去定一个闹钟去通知我们, 而在编程中 有许许多多的…

ssm高校功能教室预约系统java idea maven

本网站所实现的是一个高校功能教室预约系统&#xff0c;该系统严格按照需求分析制作相关模块&#xff0c;并利用所学知识尽力完成&#xff0c;但是本人由于学识浅薄&#xff0c;无法真正做到让该程序可以投入市场使用&#xff0c;仅仅简单实现部分功能&#xff0c;希望日后还能…

springboot集成Redis

springboot集成Redis1 windows平台安装Redis2 引入依赖3 修改配置文件4 启动类添加注解5 指定缓存哪个方法6 配置Redis的超时时间小BUG测试对于项目中一些访问量较大的接口&#xff0c;配置上Redis缓存&#xff0c;提升系统运行速度。1 windows平台安装Redis github.com/Micro…

谈一谈API接口开发

做过开发的程序猿&#xff0c;基本都写过接口&#xff0c;写接口不算难事&#xff0c;与接口交互的对象核对好接口的地址、请求参数和响应参数即可&#xff0c;我在作为面试官去面试开发人员的时候&#xff0c;有时候会问这个问题&#xff0c;但相当多的一部分人并没有深入的考…

BERT(NAACL 2019)-NLP预训练大模型论文解读

文章目录摘要算法BERT预训练Masked LMNSPFine-tune BERT实验GLUESQuAD v1.1SQuAD v2.0SWAG消融实验预训练任务影响模型大小影响BERT基于特征的方法结论论文&#xff1a; 《BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding》github&#xff…

QT+OpenGL 摄像机

QTOpenGL 摄像机 本篇完整工程见gitee:QtOpenGL 对应点的tag&#xff0c;由turbolove提供技术支持&#xff0c;您可以关注博主或者私信博主 OpenGL本身没有摄像机的定义&#xff0c;但是我们可以通过把场景中的所有物体往相反方向移动的方式来模拟出摄像机&#xff0c;产生一…

Linux内核启动(2,0.11版本)内核启动前的苦力活与内核启动

内核启动前的工作 在上一章的内容中&#xff0c;我们跳转到了setup.s的代码部分&#xff0c;这章我们先讲一讲setup做了什么吧 entry start start:! ok, the read went well so we get current cursor position and save it for ! posterity.mov ax,#INITSEG ! this is done …