手撸XXL-JOB(四)——远程调用定时任务

news2025/1/16 11:14:32

Java Socket网络编程

网络编程是Java编程中的重要组成部分,包括服务端和客户端两部分内容。Socket是Java网络编程的基本组件之一,用于在应用程序之间提供双向通信,Socket提供了一种标准的接口,允许应用程序通过网络发送和接收数据,在Java中,Socket可以分为客户端Socket和服务端Socket两种类型。
客户端Socket:客户端 Socket 用于与服务端 Socket 进行通信。客户端 Socket 通过指定服务端的 IP 地址和端口号,连接到服务端 Socket,然后发送数据到服务端 Socket。
服务端Socket:服务端 Socket 用于接收来自客户端 Socket 的连接请求,并在连接成功后,与客户端 Socket 进行通信。服务端 Socket 首先需要创建一个 ServerSocket 对象,并通过 bind 方法绑定到一个本地端口,然后等待客户端 Socket 的连接请求。
下面是Socket的一个示例:
服务端:

package org.example.demo1;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
    public static void main(String[] args) {
        try {
            ServerSocket serverSocket = new ServerSocket(8000);
            System.out.println("Server started, waiting for client...");

            Socket socket = serverSocket.accept();
            System.out.println("Client connected.");

            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);

            String message;
            while ((message = in.readLine()) != null) {
                System.out.println("Client:" + message);
                out.println("Server received message:" + message);
            }

            in.close();
            out.close();
            socket.close();
            serverSocket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客户端:

package org.example.demo1;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class Client {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket("localhost", 8000);
            System.out.println("Connected to server.");

            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);

            BufferedReader consoleIn = new BufferedReader(new InputStreamReader(System.in));
            String message;
            while ((message = consoleIn.readLine()) != null) {
                out.println(message);
                System.out.println("Server:" + in.readLine());
            }
            consoleIn.close();
            in.close();
            out.close();
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

首先启动服务端,然后启动客户端,在客户端的控制台,输入数据,服务端能接收到数据并返回对应的响应。
image.png
image.png

远程调用定时任务

首先,我们创建两个模块,core模块包含yang-job的一些核心内容,比如IJobExecutor执行器、JobExecuteRequest执行器请求等;client模块依赖core模块,并封装和socket客户端调用相关的一些内容。
然后创建一个sample1模块,用于演示。
image.png

core模块

image.png
core目前定义了定时任务执行类和其入参、出参等信息,其中,YangJobTransferDTO包含任务类路径和任务请求,如下所示:

package com.yang.job.dto;

import com.yang.job.execute.YangJobExecuteRequest;

import java.io.Serializable;

public class YangJobTransferDTO implements Serializable {
    private String className;

    private YangJobExecuteRequest yangJobExecuteRequest;

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public YangJobExecuteRequest getYangJobExecuteRequest() {
        return yangJobExecuteRequest;
    }

    public void setYangJobExecuteRequest(YangJobExecuteRequest yangJobExecuteRequest) {
        this.yangJobExecuteRequest = yangJobExecuteRequest;
    }
}

client模块

image.png
client模块定义了客户端所需要的一些类,其中,YangJob为注解类,对于每一个定时任务,需要加上YangJob注解,才能被正确调用。

package com.yang.job.client.annotations;

import java.lang.annotation.*;

@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface YangJob {
}

YangJobClientProperty为配置信息类,目前需要两个配置信息,客户端socket的ip和端口号

package com.yang.job.client.configuration;


import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class YangJobClientProperty {
    @Value("${yang-job.executor.port}")
    private Integer port;

    @Value("${yang-job.executor.ip}")
    private String ip;


    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }
}

YangJobClientPostProcessor在SpringBoot加载完毕后,扫描bean,将实现IYongJobExecutor的bean,注册到YangJobClientManager的map中,方便后续调用

package com.yang.job.client.schema;


import com.yang.job.client.annotations.YangJob;
import com.yang.job.client.YangJobClientManager;
import com.yang.job.execute.IYangJobExecutor;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;

public class YangJobClientPostProcessor implements BeanPostProcessor {
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (!(bean instanceof IYangJobExecutor)) {
            return bean;
        }
        YangJob annotation = bean.getClass().getAnnotation(YangJob.class);
        if (annotation == null) {
            return bean;
        }
        YangJobClientManager.putJobExecutor(bean.getClass().getName(), (IYangJobExecutor) bean);
        return bean;
    }
}

YangJobClientManager负责监听端口和管理定时任务的执行,它会监听我们配置的yang-job.execute.port端口号,然后当接收到消息时,将消息转为入参,并取出对应的定时任务执行类,执行对应的代码。

package com.yang.job.client;


import com.alibaba.fastjson.JSONObject;
import com.yang.job.client.dto.YangJobClientPropertyDTO;
import com.yang.job.core.dto.YangJobTransferDTO;
import com.yang.job.core.dto.ResultT;
import com.yang.job.core.execute.IYangJobExecutor;
import com.yang.job.core.execute.YangJobExecuteRequest;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class YangJobClientManager {
    private static Map<String, IYangJobExecutor> className2JobExecutorMap = new ConcurrentHashMap<>();

    private YangJobClientPropertyDTO yangJobClientPropertyDTO;

    private ServerSocket serverSocket;

    public YangJobClientManager(YangJobClientPropertyDTO yangJobClientPropertyDTO) {
        this.yangJobClientPropertyDTO = yangJobClientPropertyDTO;
    }

    public void init() {
        Integer port = this.yangJobClientPropertyDTO.getPort();
        try {
            this.serverSocket = new ServerSocket(port);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        System.out.println("init success============");
        new Thread(() -> {
            while (true) {
                try {
                    Socket socket = serverSocket.accept();
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
                    String params = bufferedReader.readLine();
                    YangJobTransferDTO yangJobTransferDTO = JSONObject.parseObject(params, YangJobTransferDTO.class);
                    System.out.println(yangJobTransferDTO);
                    String className = yangJobTransferDTO.getClassName();
                    YangJobExecuteRequest yangJobExecuteRequest = yangJobTransferDTO.getYangJobExecuteRequest();
                    IYangJobExecutor jobExecutor = getJobExecutor(className);
                    if (jobExecutor != null) {
                        ResultT response = jobExecutor.execute(yangJobExecuteRequest);
                        printWriter.println(JSONObject.toJSONString(response));
                    }
                    bufferedReader.close();
                    printWriter.close();
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if (serverSocket.isClosed() || serverSocket == null) {
                    break;
                }
            }
        }).start();
    }

    public void shutdown() {
        if (this.serverSocket != null) {
            try {
                this.serverSocket.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public YangJobClientPropertyDTO getYangJobPropertyDTO() {
        return this.yangJobClientPropertyDTO;
    }

    public static void putJobExecutor(String className, IYangJobExecutor iJobExecutor) {
        className2JobExecutorMap.put(className, iJobExecutor);
    }

    public static IYangJobExecutor getJobExecutor(String className) {
        return className2JobExecutorMap.get(className);
    }

}

YangJobClientContext为客户端的上下文,负责监听SpringBoot刷新消息和关闭消息,并执行对应的操作。

package com.yang.job.client;


import com.yang.job.client.utils.SpringContextUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationContextEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;

public class YangJobClientContext implements ApplicationListener<ApplicationContextEvent> {
    private static YangJobClientContext instance;

    private ApplicationContext applicationContext;

    @Override
    public void onApplicationEvent(ApplicationContextEvent event) {
        if (event instanceof ContextRefreshedEvent) {
            System.out.println("刷新了=========");
            YangJobClientContext.instance = this;
            instance.applicationContext = applicationContext;
            init();
        } else if (event instanceof ContextClosedEvent) {
            System.out.println("销毁了=========");
            shutdown();
        }
    }

    private void init() {
        YangJobClientManager yangJobClientManager = SpringContextUtils.getBeanOfType(YangJobClientManager.class);
        yangJobClientManager.init();
    }

    private void shutdown() {
        YangJobClientManager yangJobClientManager = SpringContextUtils.getBeanOfType(YangJobClientManager.class);
        yangJobClientManager.shutdown();
    }
}

YangJobClientConfiguration为配置类,负责对YangJobClientPostProcessor、YangJobClientManager和YangJobClientContext的统一配置管理。

package com.yang.job.client.configuration;


import com.yang.job.client.YangJobClientManager;
import com.yang.job.client.YangJobClientContext;
import com.yang.job.client.dto.YangJobClientPropertyDTO;
import com.yang.job.client.schema.YangJobClientPostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class YangJobClientConfiguration {
    @Autowired
    private YangJobClientProperty yangJobClientProperty;

    @Bean
    public YangJobClientPostProcessor yangJobPostProcessor() {
        return new YangJobClientPostProcessor();
    }

    @Bean
    public YangJobClientManager yangJobClientManager() {
        YangJobClientPropertyDTO yangJobClientPropertyDTO = new YangJobClientPropertyDTO();
        yangJobClientPropertyDTO.setIp(yangJobClientProperty.getIp());
        yangJobClientPropertyDTO.setPort(yangJobClientProperty.getPort());
        return new YangJobClientManager(yangJobClientPropertyDTO);
    }

    @Bean
    public YangJobClientContext yangJobContext() {
        return new YangJobClientContext();
    }
}

最后,为了使引入client依赖的应用,能自动装配我们提供的bean,我们在resources目录下创建META-INF目录,在该目录下创建spring.factories文件,文件内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.yang.job.client.utils.SpringContextUtils,\
  com.yang.job.client.configuration.YangJobClientProperty,\
  com.yang.job.client.configuration.YangJobClientConfiguration
sample1

我们创建一个sample1项目,引入spring-boot-starter-web依赖和yang-client,yang-core的依赖

  <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.yang</groupId>
            <artifactId>yang-job-core</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.yang</groupId>
            <artifactId>yang-job-client</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

创建启动类

package com.yang.job.sample1;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class YangJobSample1App {
    public static void main(String[] args) {
        SpringApplication.run(YangJobSample1App.class, args);
    }
}

创建一个任务类:

package com.yang.job.sample1.task;

import com.yang.job.client.annotations.YangJob;
import com.yang.job.dto.ResultT;
import com.yang.job.execute.IYangJobExecutor;
import com.yang.job.execute.YangJobExecuteRequest;
import org.springframework.stereotype.Component;

@Component
@YangJob
public class TestTask1 implements IYangJobExecutor {
    @Override
    public ResultT execute(YangJobExecuteRequest yangJobExecuteRequest) {
        System.out.println("开启定时任务了,入参为:" + yangJobExecuteRequest);
        return ResultT.success();
    }
}

添加配置文件,因为client模块的YangJobClientProperty需要有yang-job.executor.port和yang-job.executor.ip这两个配置,如果我们的配置文件中,缺少这些配置,会导致报错,无法启动项目。

spring:
  application:
    name: YangJobSample1App
yang-job:
  executor:
    port: 9999
    ip: 127.0.0.1
server:
  port: 8001
测试

我们先启动刚才的sample1项目,然后执行下列代码,来远程调用TestTask1方法执行类。

 public static void main(String[] args) {
        try {
            Socket socket = new Socket("127.0.0.1", 9999);
            System.out.println("链接成功=============");
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            YangJobExecuteRequest yangJobExecuteRequest = new YangJobExecuteRequest();
            yangJobExecuteRequest.setJobId("1");
            yangJobExecuteRequest.addParam("num", "1");
            YangJobTransferDTO yangJobTransferDTO = new YangJobTransferDTO();
            yangJobTransferDTO.setClassName("com.yang.job.sample1.task.TestTask1");
            yangJobTransferDTO.setYangJobExecuteRequest(yangJobExecuteRequest);

            printWriter.println(JSONObject.toJSONString(yangJobTransferDTO));
            System.out.println("response:" + bufferedReader.readLine());
            bufferedReader.close();
            printWriter.close();
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

执行结果如下,说明我们能成功地进行远程调用。
image.png
image.png

添加远程任务

domain层

在上一篇文章中,我们操作的任务,都是本地任务,现在我们需要对远程任务进行操作,为了区分任务类型,我们首先在domain层添加一个任务类型枚举

package com.yang.job.admin.domain.enums;

public enum JobTypeEnum {
    LOCAL("local", "本地任务"),
    REMOTE("remote", "远程任务");

    private String name;

    private String description;

    JobTypeEnum(String name, String description) {
        this.name = name;
        this.description = description;
    }

    public String getName() {
        return name;
    }

    public String getDescription() {
        return description;
    }

    public static JobTypeEnum getJobTypeByName(String name) {
        for (JobTypeEnum value : values()) {
            if (value.getName().equals(name)) {
                return value;
            }
        }
        return null;
    }
}

然后修改YangJobModel,添加上任务类型枚举和远程任务信息

package com.yang.job.admin.domain.model;


import com.yang.job.admin.client.dto.common.BusinessException;
import com.yang.job.admin.client.dto.common.ErrorCode;
import com.yang.job.admin.domain.enums.JobExecuteStrategyEnum;
import com.yang.job.admin.domain.enums.JobTypeEnum;
import com.yang.job.admin.domain.event.SaveJobPostEvent;
import com.yang.job.admin.domain.event.SubmitJobPostEvent;
import com.yang.job.admin.domain.event.UpdateJobPostEvent;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.admin.infra.event.EventCenter;
import com.yang.job.admin.infra.job.YangJobManager;
import com.yang.job.admin.infra.job.request.YangJobSubmitParam;
import com.yang.job.admin.infra.utils.CronUtils;
import com.yang.job.admin.infra.utils.SpringContextUtils;
import lombok.Data;

import java.io.Serializable;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Data
public class YangJobModel implements Serializable {
    private Integer jobId;

    private String jobName;

    private String description;

    private String cron;

    private String executeClassPath;

    private Runnable runnable;

    private JobExecuteStrategyEnum executeStrategy;

    private JobTypeEnum jobType;

    private RemoteExecutorMessage remoteExecutorMessage;

    private Integer enable;

    private Integer open;

    private Date createTime;

    private Date updateTime;

    private Map<String, String> featureMap = new HashMap<>();

    private Map<String, String> executeParamMap = new HashMap<>();

    public boolean isEnable() {
        if (this.enable == null) {
            return false;
        }
        return this.enable == 1;
    }

    public boolean isOpen() {
        if (!isEnable()) {
            return false;
        }
        if (this.open == null) {
            return false;
        }
        return this.open == 1;
    }

    public boolean isClose() {
        return !isOpen();
    }

    public boolean isLocalJob() {
        return JobTypeEnum.LOCAL == this.jobType;
    }

    public boolean isRemoteJob() {
        return JobTypeEnum.REMOTE == this.jobType;
    }

    public void setExecuteStrategy(JobExecuteStrategyEnum jobExecuteStrategyEnum) {
        if (jobExecuteStrategyEnum == null) {
            throw new BusinessException(ErrorCode.EXECUTE_STRATEGY_NO_EXIST);
        }
        this.executeStrategy = jobExecuteStrategyEnum;
    }


    public void submitJob() {
        YangJobSubmitParam yangJobSubmitParam = convert2YangJobSubmitParam();
        YangJobManager yangJobManager = getYangJobManager();
        yangJobManager.submitJob(yangJobSubmitParam);
        // 提交任务后,发送提交任务后置事件
        SubmitJobPostEvent submitJobPostEvent = new SubmitJobPostEvent(yangJobSubmitParam);
        getEventCenter().postEvent(submitJobPostEvent);
    }

    public void cancelJob() {
        YangJobManager yangJobManager = getYangJobManager();
        yangJobManager.cancelJob(this.jobId);
    }

    private YangJobSubmitParam convert2YangJobSubmitParam() {
        YangJobSubmitParam yangJobBuildParam = new YangJobSubmitParam();
        yangJobBuildParam.setJobId(this.jobId);
        yangJobBuildParam.setRunnable(this.runnable);
        ZonedDateTime nextExecutionTime = CronUtils.nextExecutionTime(this.cron, ZonedDateTime.now());
        ZonedDateTime nextNextExecutionTime = CronUtils.nextExecutionTime(this.cron, nextExecutionTime);
        long nowEochMill = ZonedDateTime.now().toInstant().toEpochMilli();
        long executeEochMill = nextExecutionTime.toInstant().toEpochMilli();
        long secondExecuteEochMill = nextNextExecutionTime.toInstant().toEpochMilli();
        yangJobBuildParam.setInitialDelay((int)(executeEochMill - nowEochMill) / 1000);
        yangJobBuildParam.setPeriod((int)(secondExecuteEochMill - executeEochMill) / 1000);
        yangJobBuildParam.setJobExecuteStrategy(this.executeStrategy);
        return yangJobBuildParam;
    }

    public void postSaveJobEvent() {
        SaveJobPostEvent saveJobPostEvent = new SaveJobPostEvent(this.jobId);
        getEventCenter().asyncPostEvent(saveJobPostEvent);
    }

    public void postUpdateJobEvent() {
        UpdateJobPostEvent updateJobPostEvent = new UpdateJobPostEvent(this.jobId);
        getEventCenter().asyncPostEvent(updateJobPostEvent);
    }

    public void postDeleteJobEvent() {
        UpdateJobPostEvent updateJobPostEvent = new UpdateJobPostEvent(this.jobId);
        getEventCenter().asyncPostEvent(updateJobPostEvent);
    }

    private YangJobManager getYangJobManager() {
        return SpringContextUtils.getBeanOfType(YangJobManager.class);
    }

    private EventCenter getEventCenter() {
        return SpringContextUtils.getBeanOfType(EventCenter.class);
    }

}

远程任务信息类:

package com.yang.job.admin.domain.valueobject;

import lombok.Data;

import java.io.Serializable;

@Data
public class RemoteExecutorMessage implements Serializable {
    private String ip;

    private Integer port;
}

接着我们添加一个features枚举,用于记录映射features字段中各个key表示的含义,因为我们现在表的设计中没有任务类型字段和远程信息相关的字段,所以会将这些信息添加到features字段中

package com.yang.job.admin.domain.enums;

public enum JobModelFeatureEnum {
    JOB_TYPE("jobType", "任务类型"),
    REMOTE_EXECUTOR_IP("executorIp", "执行器ip"),
    REMOTE_EXECUTOR_PORT("executorPort", "执行器端口"),
    REMOTE_EXECUTOR_MESSAGE("r_executor_m", "远程执行器的信息");

    private String name;

    private String description;

    JobModelFeatureEnum(String name, String description) {
        this.name = name;
        this.description = description;
    }


    public String getName() {
        return name;
    }

    public String getDescription() {
        return description;
    }
}

client层

我们修改原先的NewYangJobCommand类,加上任务类型属性

package com.yang.job.admin.client.dto.command;


import lombok.Data;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

@Data
public class NewYangJobCommand implements Serializable {
    private String jobName;

    private String description;

    private String cron;

    private String executeStrategy;

    private String jobType;

    private String executeClassPath;

    private Integer open;

    private Map<String, String> params = new HashMap<>();
}

然后修改YangJobDTO类,也加上jobType属性

package com.yang.job.admin.client.dto;


import lombok.Data;

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Data
public class YangJobDTO implements Serializable {
    private Integer jobId;

    private String jobName;

    private String description;

    private String cron;

    private String executeStrategy;

    private String executeClassPath;

    private String jobType;

    private Integer enable;

    private Integer open;

    private Date createTime;

    private Date updateTime;

    private Map<String, String> featureMap = new HashMap<>();

    private Map<String, String> executeParamMap = new HashMap<>();
}
application层

接着修改YangJobApplicationService类的convertYangJobModel方法,将jobType任务类型和远程任务信息添加到YangJobModel中

 private YangJobModel convert2YangJobModel(NewYangJobCommand newYangJobCommand) {
        String jobType = newYangJobCommand.getJobType();
        JobTypeEnum jobTypeEnum = JobTypeEnum.getJobTypeByName(jobType);
        if (jobType == null) {
            throw new BusinessException(ErrorCode.PARAM_VALID_ERROR);
        }
        YangJobModel yangJobModel = new YangJobModel();
        yangJobModel.setJobName(newYangJobCommand.getJobName());
        yangJobModel.setDescription(newYangJobCommand.getDescription());
        yangJobModel.setCron(newYangJobCommand.getCron());
        yangJobModel.setOpen(newYangJobCommand.getOpen());
        yangJobModel.setExecuteStrategy(JobExecuteStrategyEnum.getJobExecuteStrategyByName(newYangJobCommand.getExecuteStrategy()));
        yangJobModel.setExecuteClassPath(newYangJobCommand.getExecuteClassPath());
        yangJobModel.setExecuteParamMap(newYangJobCommand.getParams());
        yangJobModel.setJobType(jobTypeEnum);
        if (jobTypeEnum == JobTypeEnum.REMOTE) {
            String ip = newYangJobCommand.getParams().get(JobModelFeatureEnum.REMOTE_EXECUTOR_IP.getName());
            String port = newYangJobCommand.getParams().get(JobModelFeatureEnum.REMOTE_EXECUTOR_PORT.getName());
            if (ip == null || port == null) {
                throw new BusinessException(ErrorCode.PARAM_VALID_ERROR);
            }
            RemoteExecutorMessage remoteExecutorMessage = new RemoteExecutorMessage();
            remoteExecutorMessage.setIp(ip);
            remoteExecutorMessage.setPort(Integer.valueOf(port));
            yangJobModel.setRemoteExecutorMessage(remoteExecutorMessage);
        } else {
            if (yangJobModel.getExecuteClassPath() == null || yangJobModel.getExecuteClassPath().isEmpty()) {
                throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH);
            }
            try {
                Class.forName(yangJobModel.getExecuteClassPath());
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
                throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH);
            }
        }
        return yangJobModel;
    }
infra层

最后修改基础设施层,首先修改YangJobModelConvertor类,将RemoteMessage和JobType转化到features中,以及从features中取出

package com.yang.job.admin.infra.gatewayimpl.repository.convertor;

import com.alibaba.fastjson.JSONObject;
import com.yang.job.admin.domain.enums.JobExecuteStrategyEnum;
import com.yang.job.admin.domain.enums.JobModelFeatureEnum;
import com.yang.job.admin.domain.enums.JobTypeEnum;
import com.yang.job.admin.domain.model.YangJobModel;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.admin.infra.data.YangJobData;
import com.yang.job.admin.infra.job.thread.RemoteJobExecuteThread;
import com.yang.job.admin.infra.utils.FeaturesUtils;
import com.yang.job.core.dto.YangJobTransferDTO;
import com.yang.job.core.execute.IYangJobExecutor;
import com.yang.job.core.execute.YangJobExecuteRequest;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class YangJobModelConvertor {
    public YangJobData convert2Data(YangJobModel yangJobModel) {
        if (yangJobModel == null) {
            return null;
        }
        YangJobData yangJobData = new YangJobData();
        yangJobData.setJobId(yangJobModel.getJobId());
        yangJobData.setJobName(yangJobModel.getJobName());
        yangJobData.setDescription(yangJobModel.getDescription());
        yangJobData.setCron(yangJobModel.getCron());
        yangJobData.setExecuteClassPath(yangJobModel.getExecuteClassPath());
        yangJobData.setEnable(yangJobModel.getEnable());
        yangJobData.setOpen(yangJobModel.getOpen());
        yangJobData.setCreateTime(yangJobModel.getCreateTime());
        yangJobData.setUpdateTime(yangJobModel.getUpdateTime());
        Map<String, String> featureMap = yangJobModel.getFeatureMap();
        featureMap.put(JobModelFeatureEnum.JOB_TYPE.getName(), yangJobModel.getJobType().getName());
        featureMap.put(JobModelFeatureEnum.REMOTE_EXECUTOR_MESSAGE.getName(), JSONObject.toJSONString(yangJobModel.getRemoteExecutorMessage()));
        yangJobData.setFeatures(FeaturesUtils.convert2Features(featureMap));
        yangJobData.setExecuteParams(FeaturesUtils.convert2Features(yangJobModel.getExecuteParamMap()));
        yangJobData.setExecuteStrategy(yangJobModel.getExecuteStrategy().getName());
        return yangJobData;
    }

    public YangJobModel convert2Model(YangJobData yangJobData) {
        if (yangJobData == null) {
            return null;
        }
        YangJobModel yangJobModel = new YangJobModel();
        yangJobModel.setJobId(yangJobData.getJobId());
        yangJobModel.setDescription(yangJobData.getDescription());
        yangJobModel.setCron(yangJobData.getCron());
        yangJobModel.setJobName(yangJobData.getJobName());
        yangJobModel.setExecuteClassPath(yangJobData.getExecuteClassPath());
        yangJobModel.setEnable(yangJobData.getEnable());
        yangJobModel.setOpen(yangJobData.getOpen());
        yangJobModel.setCreateTime(yangJobData.getCreateTime());
        yangJobModel.setUpdateTime(yangJobData.getUpdateTime());
        yangJobModel.setFeatureMap(FeaturesUtils.convert2FeatureMap(yangJobData.getFeatures()));
        yangJobModel.setExecuteParamMap(FeaturesUtils.convert2FeatureMap(yangJobData.getExecuteParams()));
        JobExecuteStrategyEnum executeStrategy = JobExecuteStrategyEnum.getJobExecuteStrategyByName(yangJobData.getExecuteStrategy());
        if (executeStrategy == null) {
            throw new RuntimeException("执行策略有误!");
        }

        JobTypeEnum jobType = JobTypeEnum.getJobTypeByName(yangJobModel.getFeatureMap().get(JobModelFeatureEnum.JOB_TYPE.getName()));
        yangJobModel.setJobType(jobType);
        String remoteMessageStr = yangJobModel.getFeatureMap().get(JobModelFeatureEnum.REMOTE_EXECUTOR_MESSAGE.getName());
        RemoteExecutorMessage remoteExecutorMessage = JSONObject.parseObject(remoteMessageStr, RemoteExecutorMessage.class);
        yangJobModel.setRemoteExecutorMessage(remoteExecutorMessage);

        yangJobModel.setExecuteStrategy(executeStrategy);
        yangJobModel.setRunnable(buildRunnable(yangJobModel));

        return yangJobModel;
    }

    private Runnable buildRunnable(YangJobModel yangJobModel) {
        if (yangJobModel.isLocalJob()) {
            String executeClassPath = yangJobModel.getExecuteClassPath();
            try {
                Class<?> aClass = Class.forName(executeClassPath);
                if (!IYangJobExecutor.class.isAssignableFrom(aClass)) {
                    throw new RuntimeException("该类必须实现IYangJobExecutor接口");
                }
                IYangJobExecutor executor = (IYangJobExecutor) aClass.newInstance();
                YangJobExecuteRequest yangJobExecuteRequest = convert2YangJobExecuteRequest(yangJobModel);
                Runnable runnable = () -> executor.execute(yangJobExecuteRequest);
                return runnable;
            } catch (InstantiationException | IllegalAccessException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                System.out.println(String.format("%s 类路径对应的类不存在", executeClassPath));
                e.printStackTrace();
            }
        } else {
            RemoteExecutorMessage remoteExecutorMessage = yangJobModel.getRemoteExecutorMessage();
            String executeClassPath = yangJobModel.getExecuteClassPath();

            YangJobTransferDTO yangJobTransferDTO = new YangJobTransferDTO();
            yangJobTransferDTO.setClassName(executeClassPath);

            YangJobExecuteRequest yangJobExecuteRequest = convert2YangJobExecuteRequest(yangJobModel);
            yangJobTransferDTO.setYangJobExecuteRequest(yangJobExecuteRequest);

            return new RemoteJobExecuteThread(remoteExecutorMessage, yangJobTransferDTO);
        }
        return null;
    }

    private static YangJobExecuteRequest convert2YangJobExecuteRequest(YangJobModel yangJobModel) {
        YangJobExecuteRequest yangJobExecuteRequest = new YangJobExecuteRequest();
        yangJobExecuteRequest.setJobId(yangJobModel.getJobId().toString());
        yangJobExecuteRequest.setParams(yangJobModel.getExecuteParamMap());
        return yangJobExecuteRequest;
    }
}

然后添加一个RemoteJobExecuteThread类,该类实现runnable接口,当我们的任务类型为远程调用时,其YangJobModel的runnable属性为remoteJobExecuteThread类

package com.yang.job.admin.infra.job.thread;

import com.alibaba.fastjson.JSONObject;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.core.dto.YangJobTransferDTO;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;

public class RemoteJobExecuteThread implements Runnable {
    private YangJobTransferDTO yangJobTransferDTO;

    private RemoteExecutorMessage remoteExecutorMessage;

    public RemoteJobExecuteThread(RemoteExecutorMessage remoteExecutorMessage, YangJobTransferDTO yangJobTransferDTO) {
        this.remoteExecutorMessage = remoteExecutorMessage;
        this.yangJobTransferDTO = yangJobTransferDTO;
    }

    @Override
    public void run() {
        try {
            String ip = remoteExecutorMessage.getIp();
            Integer port = remoteExecutorMessage.getPort();
            Socket socket = new Socket(ip, port);
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

            printWriter.println(JSONObject.toJSONString(yangJobTransferDTO));

            bufferedReader.close();
            printWriter.close();
            socket.close();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

测试

我们先启动之前的sample1项目,然后启动yang-job-admin,调用http://localhost:8080/job添加任务,请求体如下:

{
  "jobName": "RemoteJobExecutor",
"description":"RemoteJobExecutor",
  "cron": "0/10 * * * * ?",
  "executeStrategy": "withFixedDelay",
  "executeClassPath": "com.yang.job.sample1.task.TestTask1",
"open":1,
"jobType":"remote",
"params":{
"executorIp":"127.0.0.1",
"executorPort":"9999"
}
}

image.png
添加成功后,我们查看Sample1项目的控制台,可以看到,每10秒,这个TestTask1任务会被调用一次
image.png

参考文章

https://www.yihuo.tech/programming/server-stack/exploring-the-java-network-programming-paradigm-socket-udp-nio-and-netty-in-focus/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1677564.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分体工业读写器的适用场景有哪些?

工业读写器根据设计方式不同&#xff0c;可分为一体式读写器和分体式读写器&#xff0c;不同读写器特点不同&#xff0c;适用场景也不同&#xff0c;下面我们就一起来了解一下超高频分体读写器适用场景有哪些。 超高频分体读写器介绍 超高频分体读写器是一种射频识别(RFID)设…

SpringAI应用开发

一、人工智能简述 四次工业革命推动了人类社会发展和变革&#xff1a; 蒸汽时代&#xff0c;发生在18世纪60年代~19世纪中期&#xff08;大约是1760年到1860年&#xff09;&#xff0c;这一时期的特点是机械化生产和大规模生产。电气时代&#xff0c;发生在19世纪下半叶~20世纪…

一篇文章搞懂 SDN中Minint和Ryu的安装及使用

SDN 一、SDN介绍 一&#xff0e; 什么是SDN? SDN字面意思是软件定义网络&#xff0c;其试图摆脱硬件对网络架构的限制&#xff0c;这样便可以像升级、安装软件一样对网络进行修改&#xff0c;便于更多的APP&#xff08;应用程序&#xff09;能够快速部署到网络上。 如果把…

pcdn边缘云常见sla有哪些?如何避免被白嫖

PCDN&#xff08;Point-to-Point Content Delivery Network&#xff09;边缘云常见的SLA&#xff08;Service Level Agreement&#xff09;规则包括高峰期离线、服务时间、重传延时、限速等。这些规则是为了保证服务质量和用户体验。下面将详细解释这些规则&#xff0c;并提供一…

51单片机:点亮一个LED灯

1.新建工程 选择AT89C52&#xff0c;在Atmel下显示的是See Microchip 并不需要添加启动文件到文件夹中。 添加main.c文件&#xff0c;c比cpp效率高&#xff0c;.asm汇编即更底层 程序编写好后 nop(); 该函数在这个头文件里面 #include <INTRINS.H> #include <R…

flex 盒子布局 align-items: start; flex-wrap: wrap; justify-content: space-between;

flex 盒子布局 align-items: start; flex-wrap: wrap; justify-content: space-between; 总盒子 .allboc {display: flex;width: 100%;align-items: start;flex-wrap: wrap;justify-content: space-between; }左边 justify-content: flex-start; .blog-articles {display: fl…

3588 pwm android12 的操作,包含 NDK native C++

问题&#xff1a; 客户需要在android12 的界面上操作板卡上的 PWM 蜂鸣器设备。 过程&#xff1a; 1 了解一下 3588 android12 源码的 关于PWM 的驱动。 设备树找不到 pwm 但是&#xff0c; 还不知道&#xff0c;android12 最终包含的 设备树是哪个&#xff0c;但是经过我的…

鸿蒙OS开发:【Stage模型应用程序包结构】

Stage模型应用程序包结构 为了让开发者能对应用程序包在不同阶段的形态更有清晰的认知&#xff0c;分别对开发态、编译态、发布态的应用程序结构展开介绍。 开发态包结构 在DevEco Studio上[创建一个项目工程]&#xff0c;并尝试创建多个不同类型的Module。根据实际工程中的…

数据分析(二)——导入外部数据,导入Excel数据,CSV文件,txt文件,HTML网页,数据抽取,DataFrame对象的loc属性与iloc属性

一.导入外部数据 1.导入.xIs或.xIsx文件 pd.read_ excel(io,sheet_ name,header) 1.1常用参数说明 ●io:表示.xIs或.xIsx文件路径或类文件对象 ●sheet name:表示工作表&#xff0c;取值如下表所示 ●header:默认值为0&#xff0c;取第一行的值为列名&#xff0c;数据为除列…

C++ 结构体内存对齐

定义了两个结构体 typedef struct Cmd {uint8_t ua;uint8_t ub;uint8_t uc;uint32_t ue; } Cmd_t;typedef struct Cmd_tag {uint8_t value;uint8_t data[1]; // 将 data 定义为指向 Cmd_t 结构体的指针 } tag_t;在实际使用中&#xff0c;看见前人的代码是&#xff0c;new 一块内…

【Qt问题】windeployqt如何提取Qt依赖库

往期回顾 【Qt问题】Qt Creator 如何链接第三方库-CSDN博客 【Qt问题】Qt 如何带参数启动外部进程-CSDN博客 【Qt问题】VS2019 Qt win32项目如何添加x64编译方式-CSDN博客 【Qt问题】windeployqt如何提取Qt依赖库 考虑这个问题主要是&#xff1a;当我们的程序运行好之后&#…

BI报表大用处 揭秘BI报表在行业中的变革力量

BI报表&#xff0c;即商业智能报表&#xff0c;是一种利用商业智能技术将企业中的数据转换为有意义的信息和可视化展示的报告。它通过将企业内部的大量数据转化为直观、易于理解的图表和指标&#xff0c;帮助决策者快速捕捉关键业务信息&#xff0c;识别趋势和模式&#xff0c;…

【深度学习】Diffusion扩散模型的逆扩散问题

1、前言 上一篇&#xff0c;我们讲了Diffusion这个模型的原理推导。但在推导中&#xff0c;仍然遗留了一些问题。本文将解决那些问题 参考论文&#xff1a; ①Variational Diffusion Models (arxiv.org) ②Tutorial on Diffusion Models for Imaging and Vision (arxiv.org…

训练集、测试集与验证集:机器学习模型评估的基石

在机器学习中&#xff0c;为了评估模型的性能&#xff0c;我们通常会将数据集划分为训练集&#xff08;Training Set&#xff09;、验证集&#xff08;Validation Set&#xff09;和测试集&#xff08;Test Set&#xff09;。这种划分有助于我们更好地理解模型在不同数据上的表…

React 第三十六章 Scheduler 任务调度

Scheduler 用于在 React 应用中进行任务调度。它可以帮助开发人员在处理复杂的任务和操作时更好地管理和优化性能。 关于 Scheduler 在React 如何渲染的可以参考 React 第三十四章 React 渲染流程 下面我们根据流程图先简单的了解 Scheduler 的调度过程 Scheduler 维护两个队…

机器人SCI期刊,中科院3区,收稿范围广泛!

一、期刊名称 Journal of Intelligent & Robotic Systems 二、期刊简介概况 期刊类型&#xff1a;SCI 学科领域&#xff1a;计算机科学 影响因子&#xff1a;3.3 中科院分区&#xff1a;3区 出版方式&#xff1a;开放出版 版面费&#xff1a;$2990 三、期刊征稿范围…

基于springboot实现的家具销售电商平台

开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09; 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&…

05-13 周一 量化是什么

05-13 周一 量化是什么 时间版本修改人描述2024年5月13日11:27:25V0.1宋全恒新建文档2024年5月14日16:21:20V1.0宋全恒了解 简介 神经网络在运行时有较高的计算成本&#xff0c;而且随着大模型时代的到来&#xff0c;知识由一个巨大的LLM存储&#xff0c;为了获取知识&#xf…

别再找了!吐血整理ChatGPT 3.5/4.0新手使用手册

引领科技潮流的ChatGPT早已名声在外&#xff0c;如今获取ChatGPT已变得触手可及&#xff0c;但很多人还多次提问如何使用chatgpt&#xff0c;为了避免陷入误区&#xff0c;本文旨在为广大ChatGPT爱好者提供一份实用的指南。 因此&#xff0c;帮助大家更好地掌握其使用技巧&…

免费PPT模板下载,无套路。

身在职场做好PPT是一项必备技能&#xff0c;如何快速做出好看又高级的PPT&#xff0c;收藏好这6个网站&#xff0c;不管你是工作总结、毕业论文、个人简历、企业宣传都能找到合适的模板&#xff0c;最重要的是可以免费下载。 1、菜鸟图库 ppt模板免费下载|ppt背景图片 - 菜鸟图…