系统架构图
XXL-JOB职责划分
准备工作
1.下载代码
https://gitee.com/xuxueli0323/xxl-job/tree/2.3.0/
2.将xxl-job sql执行
3.修改xxl-job-admin数据库链接及账号密码
4.创建日志目录修改日志文件指定输出目录
5.启动项目后访问xxl-job-admin
http://localhost:8080/xxl-job-admin
初次登录 admin 123456
创建执行器
1.创建新项目
2.添加依赖
<!-- xxl-job-core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
3.添加配置文件application.properties
server.port=8088
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### xxl-job, access token
xxl.job.accessToken=default_token
### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9998
### xxl-job executor log-path
xxl.job.executor.logpath=/Users/hejiawang/IdeaProjects/xxl-job-demo
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30
4.添加配置类
XxlJobConfig
package com.hejiawang.xxljobdemo.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
5.添加简单的执行类
package com.hejiawang.xxljobdemo.job;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Component
public class SimpleXxlJob {
/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
System.out.println("执行定时任务,执行时间:"+new Date());
}
}
6.测试
启动执行器,通过xxl-job管理页面查看到有新的执行任务
http://localhost:8080/xxl-job-admin
7.通过调度中心出发执行器
新增调度任务,在xxl-job-admin页面中新增调度任务5s一次
启动任务
在执行器中即可看到 5s打印一次demoJobHandler
GLUE模式运行
1.新增调度任务
2.新增调用代码 并保存
package com.xxl.job.service.handler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.IJobHandler;
import com.hejiawang.xxljobdemo.service.HelloService;
import org.springframework.beans.factory.annotation.Autowired;
public class DemoGlueJobHandler extends IJobHandler {
@Autowired
HelloService helloService;
@Override
public void execute() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
}
}
3.在执行器中添加HelloService类
package com.hejiawang.xxljobdemo.service;
import org.springframework.stereotype.Service;
@Service
public class HelloService {
public void methodA(){
System.out.println("执行methodA的方法");
}
public void methodB(){
System.out.println("执行methodB的方法");
}
}
4.测试
执行一次
执行器中方法被执行
启动即可10s执行一次
执行器集群
1.在IDEA中添加一个进程
在IDEA中添加启动JVM参数
-Dserver.port=8088 -Dxxl.job.executor.port=9998
-Dserver.port=8089 -Dxxl.job.executor.port=9999
启动2个Springboot进程
在xxl-job-admin中查看注册上来2个端口
2.启动之前的定时任务5s一次
只在8088进程中执行
3.调整集群模式下调度任务路由策略
修改为轮训再次启动
8088进程
8089进程
分片功能
1.准备工作
创建分片表
执行sql xxl_job_demo.sql导入数据
一共2000行数据
2.工程中添加xxl-job依赖
<!--MyBatis驱动-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.2.0</version>
</dependency>
<!--mysql驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
3.添加配置
spring.datasource.url=jdbc:mysql://localhost:3306/xxl_job_demo?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=UTF-8
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.username=xxx
spring.datasource.password=xxx
4.添加实体类
@Data
public class UserMobilePlan {
private Long id;//主键
private String username;//用户名
private String nickname;//昵称
private String phone;//手机号码
private String info;//备注
}
5.添加mapper类
@Mapper
public interface UserMobilePlanMapper {
@Select("select * from t_user_mobile_plan")
List<UserMobilePlan> selectAll();
}
6.在job类中添加发送短信模拟方法
@XxlJob("sendMsgHandler")
public void sendMsgHandler() throws Exception{
List<UserMobilePlan> userMobilePlans = userMobilePlanMapper.selectAll();
System.out.println("任务开始时间:"+new Date()+",处理任务数量:"+userMobilePlans.size());
Long startTime = System.currentTimeMillis();
userMobilePlans.forEach(item->{
try {
//模拟发送短信动作
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("任务结束时间:"+new Date());
System.out.println("任务耗时:"+(System.currentTimeMillis()-startTime)+"毫秒");
}
7.启动执行器
8.在xxl-job-admin中创建调度器并且未分片执行
1分钟执行一次,并且启动
在执行器中可以看到一次读了2000条数据,并未分片,并且需要20s才处理完
9.在执行器中分片执行
将调度任务路由策略修改为分片广播的形式
修改mapper方法,新增取模条件查询
@Mapper
public interface UserMobilePlanMapper {
@Select("select * from t_user_mobile_plan")
List<UserMobilePlan> selectAll();
@Select("select * from t_user_mobile_plan where mod(id,#{shardingTotal})=#{shardingIndex}")
List<UserMobilePlan> selectByMod(@Param("shardingIndex") Integer shardingIndex, @Param("shardingTotal")Integer shardingTotal);
}
修改 sendMsgHandler
@XxlJob("sendMsgHandler")
public void sendMsgHandler() throws Exception {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
System.out.println("分片的总数:"+shardTotal+",分片的索引:"+shardIndex);
List<UserMobilePlan> userMobilePlans = null;
if(shardTotal==1){
//如果没有分片就直接查询所有数据
userMobilePlans = userMobilePlanMapper.selectAll();
}else{
userMobilePlans = userMobilePlanMapper.selectByMod(shardIndex,shardTotal);
}
System.out.println("处理任务数量:"+userMobilePlans.size());
Long startTime = System.currentTimeMillis();
userMobilePlans.forEach(
item -> {
try {
// 模拟发送短信动作
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("任务结束时间:" + new Date());
System.out.println("任务耗时:" + (System.currentTimeMillis() - startTime) + "毫秒");
}
重启2个执行器进程
将调度任务改为分片广播启动
即可看到每个进程处理一半的数据并且每个都10s处理完