【黑马头条】-day05延迟队列文章发布审核-Redis-zSet实现延迟队列-Feign远程调用

news2024/10/5 17:28:21

文章目录

  • 昨日回顾
  • 今日内容
  • 1 延迟任务
    • 1.1 概述
    • 1.2 技术对比
      • 1.2.1 DelayQueue
      • 1.2.2 RabbitMQ
      • 1.2.3 Redis实现
      • 1.2.4 总结
  • 2 redis实现延迟任务
    • 2.0 实现思路
    • 2.1 思考
    • 2.2 初步配置实现
      • 2.2.1 导入heima-leadnews-schedule模块
      • 2.2.2 在Nacos注册配置管理leadnews-schedule
      • 2.2.3 导入表结构
      • 2.2.4 根据表结构导入实体类及其mapper
      • 2.2.5 表结构中的乐观锁
        • 2.2.5.1 在启动类中加入乐观锁的拦截器
      • 2.2.6 安装redis
      • 2.2.7 在项目中集成redis
        • 2.2.7.1 导入redis依赖
        • 2.2.7.2 为redis添加连接配置
        • 2.2.7.3 拷贝工具类CacheService
        • 2.2.7.4 将CacheService注册到spring自动配置
        • 2.2.7.5 测试List
        • 2.2.7.6 测试Zset
    • 2.3 添加任务
      • 2.3.1 导入task类
      • 2.3.2 创建TaskService
      • 2.3.3 测试
    • 2.4 取消任务
      • 2.4.1 Service
      • 2.4.2 测试
    • 2.5 拉取任务
      • 2.5.1 Service
      • 2.5.2 测试
    • 2.6 定时刷新
      • 2.6.1 如何获取zset中所有的key?
      • 2.6.2 数据如何同步?
      • 2.6.3 Redis管道
      • 2.6.4 zSet和List数据同步实现
      • 2.6.5 开启定时任务
      • 2.6.6 分布式下的Schedule
      • 2.6.7 Redis分布式锁
      • 2.6.8 数据库和Redis的同步
    • 2.7 延迟队列对外接口
      • 2.7.1 IScheduleClinet接口
      • 2.7.2 在微服务中实现类
    • 2.8 发布文章集成延迟队列
      • 2.8.1 添加askTypeEnum类枚举类
      • 2.8.2 Task的参数序列化
      • 2.8.3 实现文章发布集成接口及实现类
      • 2.8.4 修改文章发布逻辑
      • 2.8.5 启动测试
    • 2.9 消费任务审核文章
      • 2.9.1 综合测试


昨日回顾

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

今日内容

在这里插入图片描述

1 延迟任务

1.1 概述

在这里插入图片描述

1.2 技术对比

1.2.1 DelayQueue

在这里插入图片描述

1.2.2 RabbitMQ

在这里插入图片描述

1.2.3 Redis实现

在这里插入图片描述

1.2.4 总结

在这里插入图片描述

2 redis实现延迟任务

2.0 实现思路

在这里插入图片描述

2.1 思考

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2.2 初步配置实现

在这里插入图片描述

2.2.1 导入heima-leadnews-schedule模块

在这里插入图片描述

在这里插入图片描述

2.2.2 在Nacos注册配置管理leadnews-schedule

spring:
  redis:
    host: 192.168.204.129
    password: leadnews
    port: 6379
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false
    username: root
    password: 123sjbsjb

# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:
  mapper-locations: classpath*:mapper/*.xml
  # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.heima.model.schedule.pojos

minio:
  accessKey: minio
  secretKey: minio123
  bucket: leadnews
  endpoint: http://192.168.204.129:9000
  readPath: http://192.168.204.129:9000

在这里插入图片描述

2.2.3 导入表结构

在这里插入图片描述

在这里插入图片描述

2.2.4 根据表结构导入实体类及其mapper

导入heima-leadnews-model模块下com.heima.model.schedule下导入两个Taskinfo和TaskinfoLogs实体类

@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 任务id
     */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * 执行时间
     */
    @TableField("execute_time")
    private Date executeTime;

    /**
     * 参数
     */
    @TableField("parameters")
    private byte[] parameters;

    /**
     * 优先级
     */
    @TableField("priority")
    private Integer priority;

    /**
     * 任务类型
     */
    @TableField("task_type")
    private Integer taskType;


}
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 任务id
     */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * 执行时间
     */
    @TableField("execute_time")
    private Date executeTime;

    /**
     * 参数
     */
    @TableField("parameters")
    private byte[] parameters;

    /**
     * 优先级
     */
    @TableField("priority")
    private Integer priority;

    /**
     * 任务类型
     */
    @TableField("task_type")
    private Integer taskType;

    /**
     * 版本号,用乐观锁
     */
    @Version
    private Integer version;

    /**
     * 状态 0=int 1=EXECUTED 2=CANCELLED
     */
    @TableField("status")
    private Integer status;


}

对应mapper

@Mapper
public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {

}
@Mapper
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {

    public List<Taskinfo> queryFutureTime(@Param("taskType")int taskType, @Param("priority")int priority, @Param("future")Date future);
}

TaskinfoMapper对应的mybatis的xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.heima.schedule.mapper.TaskinfoMapper">

    <select id="queryFutureTime" resultType="com.heima.model.schedule.pojos.Taskinfo">
        select *
        from taskinfo
        where task_type = #{taskType}
          and priority = #{priority}
          and execute_time <![CDATA[<]]> #{future,javaType=java.util.Date}
    </select>

</mapper>

2.2.5 表结构中的乐观锁

@Version
private Integer version;

在这里插入图片描述

在这里插入图片描述

2.2.5.1 在启动类中加入乐观锁的拦截器

在heima-leadnews-schedule模块下的启动类中加入乐观锁的拦截器

@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
public class ScheduleApplication {

    public static void main(String[] args) {
        SpringApplication.run(ScheduleApplication.class,args);
    }
    @Bean
    public MybatisPlusInterceptor optimisticLockerInterceptor(){
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
        return interceptor;
    }
}

2.2.6 安装redis

移除已有redis

docker rm redis

创建新的redis容器

docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"

密码leadnews

在这里插入图片描述

2.2.7 在项目中集成redis

2.2.7.1 导入redis依赖

在heima-leadnews-common模块中添加redis依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>
2.2.7.2 为redis添加连接配置

在nacos中的leadnews-schedule配置中心为redis添加配置

spring:
  redis:
    host: 192.168.204.129
    password: leadnews
    port: 6379

在这里插入图片描述

2.2.7.3 拷贝工具类CacheService

拷贝工具类CacheService到heima-leadnews-common的com.heima.common.redis下

在这里插入图片描述

2.2.7.4 将CacheService注册到spring自动配置

在这里插入图片描述

2.2.7.5 测试List

在heima-leadnews-schedule中创建RedisTest测试类

在这里插入图片描述

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {
    @Autowired
    private CacheService cacheService;
    @Test
    public void testList() {
        //在List的左边添加元素
        cacheService.lLeftPush("list_001", "hello,redis");
    }
}

在这里插入图片描述

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {
    @Autowired
    private CacheService cacheService;
    @Test
    public void testList() {
        //在List的左边添加元素
        //cacheService.lLeftPush("list_001", "hello,redis");

        //在List的右边获取元素并删除
        String value = cacheService.lRightPop("list_001");
        System.out.println(value);
    }
}

在这里插入图片描述

查看redis发现已经没有数据了

在这里插入图片描述

2.2.7.6 测试Zset
@Test
public void testZset() {
    //添加元素到Zset中,按照分值
    cacheService.zAdd("zset_key_001", "hello zset 001", 1000);
    cacheService.zAdd("zset_key_002", "hello zset 002", 8888);
    cacheService.zAdd("zset_key_003", "hello zset 003", 7777);
    cacheService.zAdd("zset_key_004", "hello zset 004", 99999);
    //按照分值获取元素
}

在这里插入图片描述

获取前三条数据

@Test
public void testZset() {
    /*//添加元素到Zset中,按照分值
    cacheService.zAdd("zset_key_001", "hello zset 001", 1000);
    cacheService.zAdd("zset_key_001", "hello zset 002", 8888);
    cacheService.zAdd("zset_key_001", "hello zset 003", 7777);
    cacheService.zAdd("zset_key_001", "hello zset 004", 99999);*/
    //按照分值获取元素
    Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);
    System.out.println(zset_key_001);
}

在这里插入图片描述

2.3 添加任务

在这里插入图片描述

2.3.1 导入task类

@Data
public class Task implements Serializable {

    /**
     * 任务id
     */
    private Long taskId;
    /**
     * 类型
     */
    private Integer taskType;

    /**
     * 优先级
     */
    private Integer priority;

    /**
     * 执行id
     */
    private long executeTime;

    /**
     * task参数
     */
    private byte[] parameters;
    
}

2.3.2 创建TaskService

在heima-leadnews-schedule模块下创建com.heima.schedule.service.TaskService接口及实现

public interface TaskService {
    /**
     * 添加延迟任务
     * @param task
     * @return
     */
    long addTask(Task task);
}

实现包含

1.添加任务到数据库中

2.添加任务到redis中

2.1 如果任务的执行时间小于当前时间,直接执行任务

2.2 如果任务的执行时间大于当前时间&&小于等于预设时间,添加到延迟队列中

@Service
@Slf4j
public class TaskServiceImpl implements TaskService {
    /**
     * 添加延迟任务
     * @param task
     * @return
     */
    @Override
    public long addTask(Task task) {
        //1.添加任务到数据库中
        boolean success= addTaskToDB(task);
        //2.添加任务到redis中
        if(success){
            addTaskToRedis(task);
        }
        return task.getTaskId();
    }

    @Autowired
    private CacheService cacheService;
    /**
     * 添加任务到redis中
     * @param task
     */
    private void addTaskToRedis(Task task) {
        String key = task.getTaskType()+"_"+task.getPriority();
        //获取预设时间,5分钟后
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE,5);
        long nextSchedule = calendar.getTimeInMillis();
        //2.1 如果任务的执行时间小于当前时间,直接执行任务
        if(task.getExecuteTime()<=System.currentTimeMillis()){
            cacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));
        }
        //2.2 如果任务的执行时间大于当前时间&&小于等于预设时间,添加到延迟队列中
        else if(task.getExecuteTime()>System.currentTimeMillis()&&task.getExecuteTime()<=nextSchedule){
            cacheService.zAdd(ScheduleConstants.FUTURE+key,JSON.toJSONString(task),task.getExecuteTime());
        }
    }

    @Autowired
    private TaskinfoMapper taskinfoMapper;
    @Autowired
    private TaskinfoLogsMapper taskinfoLogsMapper;
    /**
     * 添加任务到数据库中
     * @param task
     * @return
     */
    private boolean addTaskToDB(Task task) {
        boolean flag = false;
        try {
            //1.保存任务表
            Taskinfo taskinfo = new Taskinfo();
            BeanUtils.copyProperties(task,taskinfo);
            taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
            taskinfoMapper.insert(taskinfo);
            //设置Task的id
            task.setTaskId(taskinfo.getTaskId());
            //2.保存任务日志表
            TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
            BeanUtils.copyProperties(taskinfo,taskinfoLogs);
            taskinfoLogs.setVersion(1);
            taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
            taskinfoLogsMapper.insert(taskinfoLogs);
            flag = true;
        } catch (Exception e) {
            log.error("添加任务到数据库失败",e);
            e.printStackTrace();
        }

        return flag;
    }
}

还有个常量类放入heima-leadnews-common模块下的com.heima.common.constant包下

package com.heima.common.constants;

public class ScheduleConstants {

    //task状态
    public static final int SCHEDULED=0;   //初始化状态

    public static final int EXECUTED=1;       //已执行状态

    public static final int CANCELLED=2;   //已取消状态

    public static String FUTURE="future_";   //未来数据key前缀

    public static String TOPIC="topic_";     //当前数据key前缀
}

2.3.3 测试

public class TaskServiceImpl implements TaskService点击TaskService,CONTROL+SHIFT+T创建测试

在这里插入图片描述

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
class TaskServiceImplTest {
    @Autowired
    private TaskService taskService;

    @Test
    void addTask() {
        Task task = new Task();
        task.setTaskType(100);
        task.setPriority(50);
        task.setExecuteTime(new Date().getTime()+2000);
        task.setParameters("task test".getBytes());

        long taskId = taskService.addTask(task);
        log.info("taskId:{}", taskId);
    }
}

在这里插入图片描述

显示如此

2.4 取消任务

在这里插入图片描述

2.4.1 Service

boolean deleteTask(Long taskId);
/**
 * 删除任务
 * @param taskId
 * @return
 */
@Override
public boolean deleteTask(Long taskId) {
    boolean flag = false;
    //1.删除数据库中的任务
    int success = taskinfoMapper.deleteById(taskId);
    if(success==0){
        return flag;
    }
    try {
        //2.更新日志状态
        TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
        taskinfoLogs.setStatus(ScheduleConstants.CANCELLED);
        taskinfoLogsMapper.updateById(taskinfoLogs);
        //3.删除redis中的任务
        Task task = new Task();
        BeanUtils.copyProperties(taskinfoLogs,task);
        task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
        String key = task.getTaskType()+"_"+task.getPriority();
        if(task.getExecuteTime()<=System.currentTimeMillis()) {
            cacheService.lRemove(ScheduleConstants.TOPIC + key, 0, JSON.toJSONString(task));
        }else{
            cacheService.zRemove(ScheduleConstants.FUTURE + key, JSON.toJSONString(task));
        }
        flag = true;
    } catch (Exception e) {
        log.error("删除任务失败",e);
        e.printStackTrace();
    }
    return flag;
}

2.4.2 测试

@Test
void deleteTask() {
    boolean flag = taskService.deleteTask(1773909243989106689);
    log.info("flag:{}", flag);
}

在这里插入图片描述

2.5 拉取任务

在这里插入图片描述

2.5.1 Service

Task poll(int type, int priority);
/**
 * 按照类型和优先级拉取任务
 * @return
 */
@Override
public Task poll(int type,int priority) {
    Task task = null;
    try {
        String key = type+"_"+priority;
        String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
        if(StringUtils.isNotBlank(task_json)){
            task = JSON.parseObject(task_json, Task.class);
            //更新数据库信息
            updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
        }
    }catch (Exception e){
        e.printStackTrace();
        log.error("poll task exception");
    }

    return task;
}

2.5.2 测试

@Test
void testPoll() {
    Task task = taskService.poll(100, 50);
    log.info("task:{}", task);
}

拉取成功

在这里插入图片描述

2.6 定时刷新

在这里插入图片描述

在这里插入图片描述

2.6.1 如何获取zset中所有的key?

在这里插入图片描述

在这里插入图片描述

@Test
public void testKeys() {
	Set<String> keys = cacheService.keys(ScheduleConstants.FUTURE + "*");
	System.out.println("方式一:");
	System.out.println(keys);
    Set<String> scan = cacheService.scan(ScheduleConstants.FUTURE + "*");
    System.out.println("方式二:");
    System.out.println(scan);
}

2.6.2 数据如何同步?

在这里插入图片描述

2.6.3 Redis管道

在这里插入图片描述

在这里插入图片描述

//耗时6151
@Test
public  void testPiple1(){
    long start =System.currentTimeMillis();
    for (int i = 0; i <10000 ; i++) {
        Task task = new Task();
        task.setTaskType(1001);
        task.setPriority(1);
        task.setExecuteTime(new Date().getTime());
        cacheService.lLeftPush("1001_1", JSON.toJSONString(task));
    }
    System.out.println("耗时"+(System.currentTimeMillis()- start));
}


@Test
public void testPiple2(){
    long start  = System.currentTimeMillis();
    //使用管道技术
    List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
        @Nullable
        @Override
        public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
            for (int i = 0; i <10000 ; i++) {
                Task task = new Task();
                task.setTaskType(1001);
                task.setPriority(1);
                task.setExecuteTime(new Date().getTime());
                redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());
            }
            return null;
        }
    });
    System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
}

使用管道技术执行10000次自增操作共耗时:2481毫秒

2.6.4 zSet和List数据同步实现

Cron表达式 @Scheduled(cron="0 */1 * * * ?")

在TaskService中添加方法

public void refresh()
/**
 * 定时刷新队列,每分钟刷新
 */
@Override
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
    log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");

    // 获取所有未来数据集合的key值
    Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
    for (String futureKey : futureKeys) { // future_250_250

        String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
        //获取该组key下当前需要消费的任务数据
        Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
        if (!tasks.isEmpty()) {
            //将这些任务数据添加到消费者队列中
            cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
            log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
        }
    }
}

新增测试方法

public void addTaskNew() {
    for (int i = 0; i < 5; i++) {
        Task task = new Task();
        task.setTaskType(100 + i);
        task.setPriority(50);
        task.setParameters("task test".getBytes());
        task.setExecuteTime(new Date().getTime() + 500 * i);

        long taskId = taskService.addTask(task);
    }
} 

2.6.5 开启定时任务

在启动类中添加@EnableScheduling

@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
@EnableScheduling
public class ScheduleApplication {

    public static void main(String[] args) {
        SpringApplication.run(ScheduleApplication.class,args);
    }
    @Bean
    public MybatisPlusInterceptor optimisticLockerInterceptor(){
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
        return interceptor;
    }
}

启动ScheduleApplication

未来任务已经刷新

在这里插入图片描述

2.6.6 分布式下的Schedule

再启动一个ScheduleApplication端口为51702

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2.6.7 Redis分布式锁

在这里插入图片描述

在heima-leadnews-common的工具类com.heima.common.redis.CacheService中添加方法

/**
 * 加锁
 *
 * @param name
 * @param expire
 * @return
 */
public String tryLock(String name, long expire) {
    name = name + "_lock";
    String token = UUID.randomUUID().toString();
    RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
    RedisConnection conn = factory.getConnection();
    try {

        //参考redis命令:
        //set key value [EX seconds] [PX milliseconds] [NX|XX]
        Boolean result = conn.set(
                name.getBytes(),
                token.getBytes(),
                Expiration.from(expire, TimeUnit.MILLISECONDS),
                RedisStringCommands.SetOption.SET_IF_ABSENT //NX
        );
        if (result != null && result)
            return token;
    } finally {
        RedisConnectionUtils.releaseConnection(conn, factory,false);
    }
    return null;
}

在定时刷新前加上锁操作

@Override
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {

    String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
    if (StringUtils.isBlank(token)) {
        log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");

        // 获取所有未来数据集合的key值
        Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
        for (String futureKey : futureKeys) { // future_250_250

            String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
            //获取该组key下当前需要消费的任务数据
            Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
            if (!tasks.isEmpty()) {
                //将这些任务数据添加到消费者队列中
                cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
                log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
            }
        }
    }
}

在这里插入图片描述

在这里插入图片描述

2.6.8 数据库和Redis的同步

在这里插入图片描述

在com.heima.schedule.service.impl.TaskServiceImpl中添加新的reloadData方法,数据库任务定时同步到redis中

@PostConstruct是开机自动同步

@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {
    clearCache();
    log.info("数据库数据同步到缓存");
    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.MINUTE, 5);

    //查看小于未来5分钟的所有任务
    List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
    if(allTasks != null && allTasks.size() > 0){
        for (Taskinfo taskinfo : allTasks) {
            Task task = new Task();
            BeanUtils.copyProperties(taskinfo,task);
            task.setExecuteTime(taskinfo.getExecuteTime().getTime());
            addTaskToRedis(task);
        }
    }
}

private void clearCache(){
    // 删除缓存中未来数据集合和当前消费者队列的所有key
    Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
    Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
    cacheService.delete(futurekeys);
    cacheService.delete(topickeys);
}

在这里插入图片描述

删除redis中数据,重新启动服务

在这里插入图片描述

同步成功

2.7 延迟队列对外接口

在这里插入图片描述

2.7.1 IScheduleClinet接口

对外通过Fegin进行接口调用

在heima-leadnews-feign-api模块下创建com.heima.apis.schedule包

再创建接口IScheduleClinet接口,将com.heima.schedule.service.TaskService接口的东西复制过来

@FeignClient(value = "leadnews-schedule")
public interface IScheduleClient {
    /**
     * 添加延迟任务
     * @param task
     * @return
     */
    @PostMapping("/api/v1/task/add")
    public ResponseResult addTask(@RequestBody Task task);

    /**
     * 删除任务
     * @param taskId
     * @return
     */
    @GetMapping("/api/v1/task/{taskId}")
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId);

    /**
     * 按照类型和优先级拉取
     * @param type
     * @param priority
     * @return
     */
    @GetMapping("/api/v1/{type}/{priority}")
    public ResponseResult poll(@PathVariable("type")int type, @PathVariable("priority")int priority);
}

2.7.2 在微服务中实现类

在heima-leadnews-schedule模块下创建com.heima.schedule.feign.ScheduleClient实现类(充当Controller)

@RestController
public class ScheduleClient implements IScheduleClient {
    @Autowired
    private TaskService taskService;

    /**
     * 添加延迟任务
     * @param task
     * @return
     */
    @PostMapping("/api/v1/task/add")
    @Override
    public ResponseResult addTask(@RequestBody Task task){
        return ResponseResult.okResult(taskService.addTask(task));
    }

    /**
     * 删除任务
     * @param taskId
     * @return
     */
    @GetMapping("/api/v1/task/{taskId}")
    @Override
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId){
        return ResponseResult.okResult(taskService.cancelTask(taskId));
    }

    /**
     * 按照类型和优先级拉取
     * @param type
     * @param priority
     * @return
     */
    @GetMapping("/api/v1/task/{type}/{priority}")
    @Override
    public ResponseResult poll(@PathVariable("type")int type, @PathVariable("priority")int priority){
        return ResponseResult.okResult(taskService.poll(type, priority));
    }
}

2.8 发布文章集成延迟队列

在这里插入图片描述

2.8.1 添加askTypeEnum类枚举类

定义枚举类com.heima.model.common.enums.TaskTypeEnum类

@Getter
@AllArgsConstructor
public enum TaskTypeEnum {

    NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
    REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
    private final int taskType; //对应具体业务
    private final int priority; //业务不同级别
    private final String desc; //描述信息
}

2.8.2 Task的参数序列化

Task的参数是一个二进制数据,所以需要序列化

引入序列化工具

在这里插入图片描述

导入两个工具类

在这里插入图片描述

导入依赖

<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.6.0</version>
</dependency>

<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.6.0</version>
</dependency>

2.8.3 实现文章发布集成接口及实现类

添加com.heima.wemedia.service.WmNewsTaskService接口

public interface WmNewsTaskService {
    /**
     * 添加文章自动发布任务
     * @param id 文章id
     * @param publishTime 发布时间
     */
    public void addNewsToTask(Integer id, Date publishTime);
}

实现类com.heima.wemedia.service.impl.WmNewsTaskServiceImpl

@Service
@Slf4j
public class WmNewsTaskServiceImpl implements WmNewsTaskService {
    @Autowired
    private IScheduleClient scheduleClient;
    @Override
    public void addNewsToTask(Integer id, Date publishTime) {
        log.info("添加文章自动发布任务,文章id:{},发布时间:{}",id,publishTime);
        Task task = new Task();
        task.setExecuteTime(publishTime.getTime());
        task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
        task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
        WmNews wmNews = new WmNews();
        wmNews.setId(id);
        task.setParameters(ProtostuffUtil.serialize(wmNews));
        scheduleClient.addTask(task);
        log.info("添加文章自动发布任务成功");
    }
}

2.8.4 修改文章发布逻辑

修改com.heima.wemedia.service.impl.WmNewsServiceImpl逻辑

第五步审核时,把任务先放到队列中,放在队列中再通过拉取任务进行审核

@Autowired
private WmNewAutoScanService wmNewAutoScanService;
@Autowired
private WmNewsTaskService wmNewsTaskService;
@Override
public ResponseResult submitNews(WmNewsDto wmNewsDto) {
    // 0.参数检查
    if(wmNewsDto == null||wmNewsDto.getContent()==null){
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }
    //1. 保存或修改文章
    WmNews wmNews = new WmNews();
    BeanUtils.copyProperties(wmNewsDto,wmNews);
    //1.1 封面
    if(wmNewsDto.getImages()!=null&& wmNewsDto.getImages().size()>0){
        String imageStr = StringUtils.join(wmNewsDto.getImages(), ",");
        wmNews.setImages(imageStr);
    }
    //1.2 如果封面为自动-1,则需要手动设置封面规则
    if(wmNewsDto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
        wmNews.setType(null);
    }
    saveOrUpdateWmNews(wmNews);
    //2.判断是否为草稿,如果为草稿结束当前方法
    if(wmNews.getStatus().equals(WmNews.Status.NORMAL.getCode())){
        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }
    //3.不是草稿,保存文章内容与图片素材的关系
    //3.1 获取文章内容的图片素材
    List<String> imageList=extractUrlInfo(wmNewsDto.getContent());
    saveRelativeInfoForContent(imageList,wmNews.getId());

    //4.不是草稿,保存文章封面图片与图片素材的关系
    saveRelativeInfoForCover(wmNewsDto,wmNews,imageList);

    //5.审核文章
    //wmNewAutoScanService.autoScanMediaNews(wmNews.getId());
    //将文章id和发布时间添加到任务中
    wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNewsDto.getPublishTime());

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

}

2.8.5 启动测试

在这里插入图片描述

在这里插入图片描述

2.9 消费任务审核文章

修改com.heima.wemedia.service.impl.WmNewsServiceImpl逻辑

@Autowired
private WmNewAutoScanService wmNewAutoScanService;
/**
 * 消费任务,审核文章
 */
@Override
@Async
@Scheduled(fixedRate = 1000)
public void scanNewsByTask() {
    log.info("开始执行文章自动审核任务");
    ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
    if(responseResult.getCode().equals(200)&&responseResult.getData()!=null){
        log.info("task:{}",responseResult.getData());
        String jsonTask = JSON.toJSONString(responseResult.getData());
        Task task = JSON.parseObject(jsonTask, Task.class);
        //逆序列化任务参数拿到id
        WmNews wmNews = ProtostuffUtil.deserialize(task.getParameters(), WmNews.class);
        wmNewAutoScanService.autoScanMediaNews(wmNews.getId());
    }
}

这个方法并不会被调用,只需要按照一定频率拉取任务

因此添加@Scheduled(fixedRate = 1000)1s中拉取一次

在这里插入图片描述

同时需要在WediaAppilcation启动类添加@EnableScheduling

@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.heima.wemedia.mapper")
@EnableFeignClients(basePackages = "com.heima.apis")
@EnableAsync
@EnableScheduling
public class WemediaApplication {

    public static void main(String[] args) {
        SpringApplication.run(WemediaApplication.class,args);
    }

    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor() {
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
        return interceptor;
    }
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter);
        return rabbitTemplate;
    }
}

2.9.1 综合测试

发布一个即时任务

在这里插入图片描述

发布一个延迟任务

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

查看控制台
在这里插入图片描述

25分即将被消费

在这里插入图片描述

状态为1表示消费成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1565044.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【单片机家电产品学习记录--红外线】

单片机家电产品学习记录–红外线 红外手势驱动电路&#xff0c;&#xff08;手势控制的LED灯&#xff09; 原理 通过红外线对管&#xff0c;IC搭建的电路&#xff0c;实现灯模式转换。 手势控制灯模式转换&#xff0c;详细说明 转载 1《三色调光LED台灯电路》&#xff0c…

大数据学习第十一天(复习linux指令3)

1、su和exit su命令就是用于账户切换的系统命令 基本语法&#xff1a;su[-] [用户名] 1&#xff09;-表示是否在切换用户后加载变量&#xff0c;建议带上 2&#xff09;参数&#xff1a;用户名&#xff0c;表示切换用户 3&#xff09;切换用户后&#xff0c;可以通过exit命令退…

Redhat 7.9 安装dm8配置文档

Redhat 7.9 安装dm8配置文档 一 创建用户 groupadd -g 12349 dinstall useradd -u 12345 -g dinstall -m -d /home/dmdba -s /bin/bash dmdba passwd dmdba二 创建目录 mkdir /dm8 chown -R dmdba:dinstall /dm8三 配置/etc/security/limits.conf dmdba soft nproc 163…

二叉树结点关键字输出的递归算法实现

在计算机科学中&#xff0c;二叉树是一种重要的数据结构&#xff0c;广泛应用于各种算法和程序设计中。二叉树的遍历是二叉树操作中的基础问题之一&#xff0c;其目的是以某种规则访问二叉树的每个结点&#xff0c;使得每个结点被且仅被访问一次。给定一个具有n个结点的二叉树&…

idea端口占用

报错&#xff1a;Verify the connector‘s configuration, identify and stop any process that‘s listening on port XXXX 翻译&#xff1a; 原因&#xff1a; 解决&#xff1a; 一、重启大法 二、手动关闭 启动spring项目是控制台报错&#xff0c;详细信息如下&#xff…

C++的并发世界(四)——线程传参

1.全局函数作为传参入口 #include <iostream> #include <thread> #include <string>void ThreadMain(int p1,float p2,std::string str) {std::cout << "p1:" << p1 << std::endl;std::cout << "p2:" <<…

css3之动画animation

动画animation 一.优点二.定义和使用三.动画序列和解释四.常见属性及解释1.常见属性及应用2.速度曲线细节 五.简写&#xff08;名字和时间不能省略&#xff09;&#xff08;持续时间在何时开始的时间前&#xff09;&#xff08;简写中无animation-play-state)六.例子1.大数据热…

图神经网络实战(7)——图卷积网络(Graph Convolutional Network, GCN)详解与实现

图神经网络实战&#xff08;7&#xff09;——图卷积网络详解与实现 0. 前言1. 图卷积层2. 比较 GCN 和 GNN2.1 数据集分析2.2 实现 GCN 架构 小结系列链接 0. 前言 图卷积网络 (Graph Convolutional Network, GCN) 架构由 Kipf 和 Welling 于 2017 年提出&#xff0c;其理念是…

HCIA-RS基础-以太网设备工作原理

目录 以太网设备工作原理1. HUB 的工作原理2. L2 交换机的工作原理3. L3 交换机的工作原理 总结 以太网设备工作原理 以太网是一种常用的局域网技术&#xff0c;用于在计算机网络中传输数据。在以太网中&#xff0c;有几种常见的设备&#xff0c;包括 HUB、L2 交换机和 L3 交换…

Java入门学习Day04

本篇文章主要介绍了&#xff1a;如何输入数据、字符串拼接、自增自减运算符、类型转换&#xff08;int&#xff0c;double等&#xff09; CSDN&#xff1a;码银 公众号&#xff1a;码银学编程 一、键盘输入练习 Scanner是Java中的一个类&#xff0c;用于从控制台或文件中读…

java学习3

目录 面向对象——多态 什么是多态 多态的前提 多态有什么好处&#xff1f; 包 1.包的作用? 2.包名书写的规则? 3.什么时候需要导包?什么时候不需要导包? final 修饰符 权限修饰符 4种权限修饰符的范围 抽象方法和抽象类 接口 接口的定义和使用 接口中成员的特点…

JavaScript变量对象详解

正文 在JavaScript中&#xff0c;变量对象是执行上下文中的一个重要概念&#xff0c;它负责存储函数中的变量、函数声明和形参。了解变量对象对于理解JavaScript的作用域、作用域链以及变量的声明和提升至关重要。 1. 变量对象的定义 变量对象是在执行上下文创建阶段被创建的&a…

【前端面试3+1】04浏览器存储、flex布局属性和常用指令、 promise和async await区别、【验证回文串】

一、浏览器存储 1.1类型&#xff1a; 浏览器数据存储的方式有以下几种&#xff1a; Cookie&#xff1a;小型文本文件&#xff0c;存储在用户计算机上&#xff0c;可以通过浏览器传输到服务器。Web Storage&#xff1a;包括LocalStorage和SessionStorage&#xff0c;可以在浏览器…

【JavaEE初阶系列】——一万字带你了解 JUC常见类 以及 线程安全集合类(哈希表)

目录 &#x1f6a9;JUC(java.util.concurrent) 的常见类 &#x1f388;Callable 接口 &#x1f308;理解 Callable(相关面试题) &#x1f308;理解 FutureTask &#x1f4dd;线程创建方式 &#x1f388; ReentrantLock可重入锁 &#x1f308;ReentrantLock 优势&#x…

【Python BUG】ModuleNotFoundError: No module named ‘streamlit.cli‘

问题 streamlit做大模型前端demo&#xff0c;安装后不好使。 解决方案 参考&#xff1a; https://zhuanlan.zhihu.com/p/656164361 找到下面文件&#xff1a; 替换、修改内容&#xff1a; # from streamlit.cli import main from streamlit.web.cli import main原来是上边…

JavaScript_与html结合方式

JavaScript_语法 ECMAScript&#xff1a;客户端脚本语言的标准 1.基本语法 1.1 与html结合方式&#xff08;2种&#xff09; 1. 内部JS 定义<script>,标签体内容就是js代码 2. 外部JS 定义<script>,通过src属性引入外部的 js文件 注意&#xff1a; 1.<script>…

【DPU微知识】NVIDIA-BlueFiled DPU概念之:BFB是什么?

BFB是BlueField Boot Stream的缩写&#xff0c;由Bootloader、Linux OS、Romfs组成。本质&#xff1a;bootload、系统、文件系统。&#xff08;其实就是DPU的上装类比标准host的grub、linux、文件系统&#xff0c;类似做Linux移植时候构建的最小文件系统的三件套差不多&#xf…

3D模型格式转换工具HOOPS Exchange如何将3D文件加载到PRC数据结构中?

HOOPS Exchange是一款高效的数据访问工具&#xff0c;专为开发人员设计&#xff0c;用于在不同的CAD&#xff08;计算机辅助设计&#xff09;系统之间进行高保真的数据转换和交换。由Tech Soft 3D公司开发&#xff0c;它支持广泛的CAD文件格式&#xff0c;包括但不限于AutoCAD的…

uniapp项目-懂你找图

文章目录 项目介绍项目搭建1.项目创建 2.新增tabbar3引入字体图标 uni-ui介绍使用 uni-api介绍 首页模块功能分析搭建子页面分段器介绍 封装自己的异步请求为什么要封装封装的思路 编写首页-推荐页面分页功能 专辑列表获取专辑详情数据 项目介绍 微信小程序&#xff0c;提供图…

苹果开发者账号注册后生成开发证书和发布证书的流程解析

转载&#xff1a;注册苹果开发者账号的方法 在2020年以前&#xff0c;注册苹果开发者账号后&#xff0c;就可以生成证书。 但2020年后&#xff0c;因为注册苹果开发者账号需要使用Apple Developer app注册开发者账号&#xff0c;所以需要缴费才能创建ios证书了。 所以新政策出…