整合xxl-job
1.部署调度中心
-
将doc目录下的sql脚本导入数据库
-
修改日志位置
-
maven打包mvn pageage -Dmaven.skip.test=true
-
后台方式启动 nohup java -jar xxl-job-admin-2.3.0.jar > tag-web.log 2>&1 &
-
访问localhost:端口/xxl-job-admin
2.部署执行器
-
修改调用中心地址
-
修改日志位置
-
maven打包后台方式启动 nohup java -jar xxl-job-executor-sample-springboot> tag-web.log 2>&1 &
3.秒杀系统接入xxl-job定时任务
-
实现流程
-
新增执行器
-
新增任务管理器
-
依赖
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
-
yml配置
# xxl-job配置 xxl: job: admin: # 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;xxl-job后台管理界面的地址 addresses: http://127.0.0.1:8091/xxl-job-admin executor: # 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。 address: # 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册 appname: seckill-job # 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务"; ip: # 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口; port: 0 # 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径; logpath: /Users/liubo/logs/test-job # 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能; logretentiondays: 15 # 执行器通讯TOKEN [选填]:非空时启用; accessToken:
-
config类
@Slf4j @Configuration public class XxlJobConfig { @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.appname}") private String appName; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { log.info("====xxl-job config init===="); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appName); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
-
定时任务
@Slf4j @Service public class SeckillScheduled { @Autowired private SeckillService seckillService; @XxlJob("seckillScheduled") public void uploadSeckillSkuLatest3Days() { log.info("接收到定时任务触发"); // 任务配置路由策略为第一台或者一致性hash可以保证分布式情况下,只会有一台机器执行一次此任务 seckillService.uploadSeckillSkuLatest3Days(); } }
-
具体实现
@Service public class SeckillServiceImpl implements SeckillService { @Autowired private StringRedisTemplate redisTemplate; @Autowired private ProductFeignService productFeignService; @Autowired private SeckillSessionFeignService seckillSessionFeignService; @Autowired private RedissonClient redissonClient; @Override public void uploadSeckillSkuLatest3Days() { R lates3DaySession = seckillSessionFeignService.getLates3DaySession(); if (lates3DaySession.getCode() == 0) { List<SeckillSessionWithSkusVo> sessionData = lates3DaySession.getData(new TypeReference<List<SeckillSessionWithSkusVo>>() {}); //1、缓存活动信息 saveSessionInfos(sessionData); //2、缓存活动的关联商品信息 saveSessionSkuInfo(sessionData); } } /** * 缓存活动的关联商品信息 * @param sessions */ private void saveSessionSkuInfo(List<SeckillSessionWithSkusVo> sessions) { for (SeckillSessionWithSkusVo session : sessions) { //准备hash操作,绑定hash BoundHashOperations<String, Object, Object> operations = redisTemplate.boundHashOps(SeckillConstant.SECKILL_CHARE_PREFIX); if (CollectionUtil.isNotEmpty(session.getRelationSkus())){ session.getRelationSkus().forEach(seckillSkuVo -> { //生成随机码 String token = UUID.randomUUID().toString().replace("-", ""); String redisKey = seckillSkuVo.getPromotionSessionId().toString() + "-" + seckillSkuVo.getSkuId().toString(); if (!operations.hasKey(redisKey)) { //缓存我们商品信息 SeckillSkuRedisTo redisTo = new SeckillSkuRedisTo(); Long skuId = seckillSkuVo.getSkuId(); //1、先查询sku的基本信息,调用远程服务 R info = productFeignService.getSkuInfo(skuId); if (info.getCode() == 0) { SkuInfoVo skuInfo = info.getData("skuInfo",new TypeReference<SkuInfoVo>(){}); redisTo.setSkuInfo(skuInfo); } //2、sku的秒杀信息 BeanUtils.copyProperties(seckillSkuVo,redisTo); //3、设置当前商品的秒杀时间信息 redisTo.setStartTime(session.getStartTime().getTime()); redisTo.setEndTime(session.getEndTime().getTime()); //4、设置商品的随机码(防止恶意攻击) redisTo.setRandomCode(token); //序列化json格式存入Redis中 String seckillValue = JSON.toJSONString(redisTo); operations.put(redisKey,seckillValue); //如果当前这个场次的商品库存信息已经上架就不需要上架 //5、使用库存作为分布式Redisson信号量(限流) // 使用库存作为分布式信号量 RSemaphore semaphore = redissonClient.getSemaphore(SeckillConstant.SKU_STOCK_SEMAPHORE + token); // 商品可以秒杀的数量作为信号量 semaphore.trySetPermits(seckillSkuVo.getSeckillCount()); } }); } } } /** * 缓存活动信息 * @param sessions */ private void saveSessionInfos(List<SeckillSessionWithSkusVo> sessions) { for (SeckillSessionWithSkusVo session : sessions) { //获取当前活动的开始和结束时间的时间戳 long startTime = session.getStartTime().getTime(); long endTime = session.getEndTime().getTime(); //存入到Redis中的key String key = SeckillConstant.SESSION_CACHE_PREFIX + startTime + "_" + endTime; //判断Redis中是否有该信息,如果没有才进行添加 Boolean hasKey = redisTemplate.hasKey(key); //缓存活动信息 if (!hasKey) { //获取到活动中所有商品的skuId if (CollectionUtil.isNotEmpty(session.getRelationSkus())){ List<String> skuIds = session.getRelationSkus().stream() .map(item -> item.getPromotionSessionId() + "-" + item.getSkuId().toString()).collect(Collectors.toList()); redisTemplate.opsForList().leftPushAll(key, skuIds); } } } } }