【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的Redis延时队列的功能组件

news2025/1/24 2:20:54

手把手教你如何开发一个属于自己的延时队列的功能组件

  • 前提介绍
  • 解决痛点
  • 延时队列组件的架构
    • 延时队列组件的初始化流程
    • 延时队列组件的整体核心类架构
    • 延时队列组件的整体核心类功能
  • 延时队列的开发组件
    • 延迟队列的机制配置初始化类
      • 源码 - DelayedQueueConfiguration
      • Redission客户端的实现
        • 源码 - DelayedRedissionClientTool
        • 核心方法源码分析
          • Offer方法存储元素 - 添加阻塞队列-元素
          • poll方法获取元素 - 从阻塞队列中拉取数据
          • 定义和初始化执行线程池
          • 定义和初始化轮询线程池
            • 源码 - DelayedThreadPoolExecutor
          • 延迟队列机制支持Redis客户端
            • 源码 - DelayedRedisClientSupport
        • RedissonClientTool的工具的封装操作
          • 源码 - RedissonClientTool
        • 辅助类定义处理
          • DelayedThreadPoolSupport
          • 线程池的构建和初始化
      • 延时队列启动DelayedBootstrapInitializer
        • 源码 - DelayedBootstrapInitializer
      • 注入监听器+异常处理器
      • init初始化操作机制控制
      • 获取延时队列数据信息的模型
        • 执行线程组机制
    • 定义延时队列的消费接口
      • 定义延时队列的消费接口扩展接口
      • DelayedBootstrapRunnable
      • 延时队列的使用案例
        • 延时队列投递数据方
        • 延时队列消费数据方:
        • 延时轮询线程异常处理器:
  • 问题反馈

前提介绍

针对于目前,系统中的延时队列的开发复杂度以及统一化管理没有完成相关的标准,故此本人封装了一款,基于Redssion的框架为基础的也是基于我们现在framework为基础的延时队列框架开发机制组件,方便未来大家去开发属于自己的延时队列的开发规范以及开发成本!

解决痛点

  • 基于原始的redis失效的EntryExpiredListener的定时监听器,因为考虑周期性和性能和延迟问题过大,所以有了本次版本组件封装的优化

  • 简化开发,系统多出使用原生的redission客户端,因为这无形中给开发人员带来了很大的工作量,考虑未来的开发过程中会存在很多延时队列的场景

  • 无标准,使用的延时开发实现原理的种类非常的多,有内存机制的延时队列、消息队列的延时实现、redis的延时队列,为了达成标准化。

  • 统一化管理,防止问题重复出现或者多点问题出现机制。

延时队列组件的架构

  • 延时队列采用redis 大key或者业务组、业务类型进行划分出不同的分割领域,每个组都是属于相互隔离。

  • 自己消费自己的数据信息以及异常处理和轮询和执行机制

在这里插入图片描述

延时队列组件的初始化流程

  • 主要针对于轮询线程、执行线程的初始化

  • 主要针对于注册监听器、异常处理器机制

在这里插入图片描述

延时队列组件的整体核心类架构

在这里插入图片描述

延时队列组件的整体核心类功能

在这里插入图片描述


延时队列的开发组件

延迟队列的机制配置初始化类

DelayedQueueConfiguration:主要集中于延时队列的配置参数类,主要用于定义针对于初始化一些基础核心的基础类服务组件的集合。

源码 - DelayedQueueConfiguration

@Configuration
@ComponentScan(basePackages = "com.hyts.assemble.redisdelayer")
public class DelayedQueueConfiguration {
    /**
     * redission客户端的实现
     * @return
     */
    @Bean
    public DelayedRedissionClientTool delayedRedissionClientTool(){
        return new DelayedRedissionClientTool();
    }
    /**
     * 执行操作处理机制(考虑是IO密集型或者混合密集型机制) - 循环监控线程机制
     * @return
     */
    @Bean("delayedExecuteThreadPoolExecutor")
    public Executor delayedExecuteThreadPoolExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor =
                DelayedThreadPoolExecutor.initParameter("delayedExecuteThreadPoolExecutor");
        // 因为可以定制化线程数量机制,是否考虑延迟机制,待议 TODO
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
    /**
     * 执行操作处理机制(考虑是IO密集型或者混合密集型机制) 异步 执行线程机制
     * @return
     */
    @Bean("delayedExecuteThreadPoolCycle")
    public Executor delayedExecuteThreadPoolCycle() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor =
                DelayedThreadPoolExecutor.initParameter("delayedExecuteThreadPoolCycle");
        threadPoolTaskExecutor.setQueueCapacity(0);
        // 系统暂时仅仅支持核心书个组,直接执行,不会存放队列数据信息
        threadPoolTaskExecutor.setMaxPoolSize(threadPoolTaskExecutor.getMaxPoolSize());
        threadPoolTaskExecutor.setCorePoolSize(threadPoolTaskExecutor.getCorePoolSize());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
    /**
     * 延迟线程池支持机制
     * @return
     */
    @Bean
    public DelayedThreadPoolSupport delayedThreadPoolSupport(@Autowired @Qualifier("delayedExecuteThreadPoolExecutor") Executor execute,
                                                             @Autowired @Qualifier("delayedExecuteThreadPoolCycle") Executor recycle){
        return new DelayedThreadPoolSupport(execute,recycle);
    }
    /**
     * 延迟队列机制支持Redis客户端
     * @return
     */
    @Bean
    public DelayedRedisClientSupport delayedRedisClientSupport(){
        return new DelayedRedisClientSupport(delayedRedissionClientTool());
    }
    /**
     * 线程池的构建和初始化
     * @return
     */
    @Bean(initMethod = "init")
    public DelayedBootstrapInitializer delayedThreadPoolExecutor(){
        return new DelayedBootstrapInitializer();
    }

    @Bean
    @ConditionalOnMissingBean(RedissonClientTool.class)
    public RedissonClientTool redissonClientTool(RedissonClient redissonClient) {
        return new RedissonClientTool(redissonClient);
    }
}

主要包含了一下几个部件:

  • DelayedRedissionClientTool:Redission客户端的实现
  • delayedExecuteThreadPoolExecutor:执行操作处理机制(考虑是IO密集型或者混合密集型机制) - 循环监控线程机制
  • delayedExecuteThreadPoolCycle:执行操作处理机制(考虑是IO密集型或者混合密集型机制) 异步 执行线程机制
  • DelayedThreadPoolSupport:延迟线程池支持机制
  • DelayedRedisClientSupport:延迟线程池支持机制

Redission客户端的实现

DelayedRedissionClientTool主要属于Redisson延时队列客户端实现类,主要包含了相关的对应的处理维护延时队列的元素数据信息操作类。

源码 - DelayedRedissionClientTool

@AutoConfigureAfter(value = RedissonClientTool.class)
@Slf4j
public class DelayedRedissionClientTool  {
    /**
     * redissionCLientTool工具机制
     */
    @Autowired
    RedissonClientTool redissonClientTool;
    /**
     * 自动注册
     */
    public DelayedRedissionClientTool() {
    }
    /**
     * 手动注册
     * @param redissonClientTool
     */
    public DelayedRedissionClientTool(RedissonClientTool redissonClientTool) {
        this.redissonClientTool = redissonClientTool;
    }

    /**
     * 添加阻塞队列-元素
     * @param <T>
     */
    public <T> void offer(ExecuteInvokerEvent<T> executeInvokerEvent) {
        //预先进行构建初始化参数条件机制
        executeInvokerEvent.preCondition(executeInvokerEvent);
        redissonClientTool.addDelayQueueElement(Objects.requireNonNull(executeInvokerEvent).getBizGroup(),
                executeInvokerEvent,executeInvokerEvent.getDelayedTime(),executeInvokerEvent.getTimeUnit());
    }
    /**
     * 获取相关的
     * @param executeInvokerEvent
     * @param <T>
     * @return
     */
    public <T> RBlockingQueue<T> takeBlockingQueue(ExecuteInvokerEvent<T> executeInvokerEvent) {
        return redissonClientTool.getRedissonClient().getBlockingQueue(executeInvokerEvent.getBizGroup());

    }
    /**
     * 操作梳理
     * @param trBlockingQueue
     * @param <T>
     * @return
     * @throws InterruptedException
     */
    public <T> ExecuteInvokerEvent<T> poll(RBlockingQueue<T> trBlockingQueue) throws InterruptedException {
        return (ExecuteInvokerEvent<T>) trBlockingQueue.take();
    }
}

核心方法源码分析

Offer方法存储元素 - 添加阻塞队列-元素
public <T> void offer(ExecuteInvokerEvent<T> executeInvokerEvent) {
      //预先进行构建初始化参数条件机制
      executeInvokerEvent.preCondition(executeInvokerEvent);
      redissonClientTool.addDelayQueueElement(Objects.requireNonNull(executeInvokerEvent).getBizGroup(),
                executeInvokerEvent,executeInvokerEvent.getDelayedTime(),executeInvokerEvent.getTimeUnit());
}
poll方法获取元素 - 从阻塞队列中拉取数据
public <T> ExecuteInvokerEvent<T> poll(RBlockingQueue<T> trBlockingQueue) throws InterruptedException {
      return (ExecuteInvokerEvent<T>) trBlockingQueue.take();
}
定义和初始化执行线程池

执行操作处理机制(考虑是IO密集型或者混合密集型机制) - 循环监控线程机制,执行线程池(公共默认)

@Bean("delayedExecuteThreadPoolExecutor")
    public Executor delayedExecuteThreadPoolExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor =
                DelayedThreadPoolExecutor.initParameter("delayedExecuteThreadPoolExecutor");
        // 因为可以定制化线程数量机制,是否考虑延迟机制,待议 TODO
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
定义和初始化轮询线程池

主要负责轮询获取对应的redis服务队列中的数据的线程所在的线程池。

 @Bean("delayedExecuteThreadPoolCycle")
    public Executor delayedExecuteThreadPoolCycle() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor =
                DelayedThreadPoolExecutor.initParameter("delayedExecuteThreadPoolCycle");
        threadPoolTaskExecutor.setQueueCapacity(0);
        // 系统暂时仅仅支持核心书个组,直接执行,不会存放队列数据信息
        threadPoolTaskExecutor.setMaxPoolSize(threadPoolTaskExecutor.getMaxPoolSize());
        threadPoolTaskExecutor.setCorePoolSize(threadPoolTaskExecutor.getCorePoolSize());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

其中内部可以看到调用了DelayedThreadPoolExecutor.initParameter的方法进行控制和初始化对应的线程池,接下来,我们来看看该线程池方法以及他 的作用是什么?

源码 - DelayedThreadPoolExecutor
public class DelayedThreadPoolExecutor {

    /**
     * 获取到服务器的cpu内核:逻辑内核核心数
     */
    private static int DEFAULT_THREAD_CORE_BASE_SIZE = Runtime.getRuntime().availableProcessors();

    /**
     * IO密集型机制控制*2
     */
    private static int DEFAULT_THREAD_CORE_SIZE_IO_TYPE = DEFAULT_THREAD_CORE_BASE_SIZE<<1;

    /**
     * 序号分配器
     */
    private static  AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 初始化参数信息
     * @return
     */
    public static ThreadPoolTaskExecutor initParameter(String threadGroup){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(DEFAULT_THREAD_CORE_SIZE_IO_TYPE);//核心池大小
        executor.setMaxPoolSize(DEFAULT_THREAD_CORE_SIZE_IO_TYPE<<4);//最大线程数 = 核心*核心池大小;
        executor.setQueueCapacity(1000);//队列程度
        executor.setKeepAliveSeconds(30);//线程空闲时间
        executor.setThreadGroupName(threadGroup);
        executor.setThreadFactory(r -> new Thread(r,String.format("%s-%s",threadGroup,atomicInteger.getAndDecrement())));
        executor.setThreadNamePrefix(threadGroup+"-");//线程前缀名称
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());//配置拒绝策略
        return executor;
    }
}

上面可以看出来它主要属于一个公共方法进行控制我们的通用线程池的参数类。

主要是讲对应的Spring的ThreadPoolTaskExecutor的对象实现类进行模板化的一个功能。降低使用着的开发量。

延迟队列机制支持Redis客户端

延迟队列机制支持Redis客户端支持类,主要目的是为了作为一个讲对应的Redis客户端类的静态引用操作。

 
    @Bean
    public DelayedRedisClientSupport delayedRedisClientSupport(){
        return new DelayedRedisClientSupport(delayedRedissionClientTool());
    }
源码 - DelayedRedisClientSupport
public class DelayedRedisClientSupport {
    @Getter
    private static DelayedRedissionClientTool delayedRedissionClientTool;

    /**
     * 延迟队列控制redis服务机制
     * @param delayedRedissionClientTool
     */
    public DelayedRedisClientSupport(DelayedRedissionClientTool delayedRedissionClientTool) {
        DelayedRedisClientSupport.delayedRedissionClientTool = delayedRedissionClientTool;
    }
}

RedissonClientTool的工具的封装操作

  @Bean
    @ConditionalOnMissingBean(RedissonClientTool.class)
    public RedissonClientTool redissonClientTool(RedissonClient redissonClient) {
        return new RedissonClientTool(redissonClient);
    }
源码 - RedissonClientTool
@Slf4j
public class RedissonClientTool {

    private RedissonClient redissonClient;

    public RedissonClientTool(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    public <T> void addDelayQueueElement(String key, T t, long delay, TimeUnit timeUnit) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(t, delay, timeUnit);
    }

    public <T> T takeDelayQueueElement(String key) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
        T t = null;
        try {
            t = blockingFairQueue.take();
        } catch (InterruptedException e) {
            log.error("takeDelayQueueElement error key: " + key, e);
        }
        return t;
    }


    /**
     * 阻塞队列添加元素
     * @param key
     * @param t
     * @param <T>
     */
    public <T> void addBlockingQueueElement(String key, T t) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
        blockingFairQueue.offer(t);
    }

    /**
     *
     * 取出队列的元素且删除
     * @param key
     * @param t
     * @param <T>
     * @return
     */
    public <T> T pollBlockQueueElement(String key, T t) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
        return blockingFairQueue.poll();
    }

    /**
     *
     * 取出队列的元素但是不删除
     * @param key
     * @param t
     * @param <T>
     * @return
     */
    public <T> T peekBlockQueueElement(String key, T t) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
        return blockingFairQueue.peek();
    }


    /**
     * 队列添加元素
     * @param key
     * @param t
     * @param <T>
     */
    public <T> void addQueueElement(String key, T t) {
        RQueue<T> queue = redissonClient.getQueue(key);
        queue.offer(t);
    }

    /**
     *
     * 取出队列的元素且删除
     * @param key
     * @param t
     * @param <T>
     * @return
     */
    public <T> T pollQueueElement(String key, T t) {
        RQueue<T> queue = redissonClient.getQueue(key);
        return queue.poll();
    }


    public RedissonClient getRedissonClient () {
        return this.redissonClient;
    }

}

辅助类定义处理

DelayedThreadPoolSupport

主要通过的对应的延时队列线程池的工具支持类,主要包含了对应的这两种线程池的引用操作处理模型,如下图所示。

public class DelayedThreadPoolSupport {

    /**
     * 任务执行线程机制
     */
    @Getter
    private static Executor taskExecuteThread;

    /**
     * 任务轮询线程机制
     */
    @Getter
    private static Executor taskRecycleThread;


    /**
     * 操作处理机制
     * @param taskExecuteThread
     * @param taskRecycleThread
     */
    public DelayedThreadPoolSupport(Executor taskExecuteThread, Executor taskRecycleThread) {
        DelayedThreadPoolSupport.taskExecuteThread = taskExecuteThread;
        DelayedThreadPoolSupport.taskRecycleThread = taskRecycleThread;
    }
}
线程池的构建和初始化

主要通过线程池进行构建对应的启动初始化对象实现类,用于绑定和初始化所有需要进行延时队列监听的线程。

@Bean(initMethod = "init")
public DelayedBootstrapInitializer delayedThreadPoolExecutor(){
       return new DelayedBootstrapInitializer();
}

延时队列启动DelayedBootstrapInitializer

  • 开始初始化加载完成系统内部所有的相关的延迟队列监听上下文接口服务数据
  • 开始开展完成线程任务分配为每个分组的监听器以及任务队列分配资源
  • 开始生产相关的监听绑定关系机制
  • 开始初始化相关的异常信息处理机制
  • 初始化线程机制

源码 - DelayedBootstrapInitializer

@Slf4j
public class DelayedBootstrapInitializer {

    @Setter
    @Getter
    @DelayedQueueListener(value="delayedListenerContextMap")
    Map<String, EventExecutableInvokerListener> delayedListenerContextMap = Maps.newHashMap();

    @Setter
    @Getter
    @DelayedQueueExceptionHandler(value="delayedExceptionHandlerMap")
    Map<String, DelayedExceptionHandler> delayedExceptionHandlerMap = Maps.newHashMap();
    
    /**
     * 初始化操作机制控制
     */
    public void init(){
        log.info("启动初始化加载并完成所有相关延迟启动初始化加载并完成所有相关延迟" +
                "系统中用于侦听上下文接口服务数据的队列 : {}",delayedListenerContextMap);
        log.info("开始完成线程任务分配,并为每个组的侦听器和任务队列分配资源");
        if(MapUtils.isEmpty(delayedListenerContextMap)){
            log.info("未找到任务侦听信息。在springcontext管理的上下文中," +
                    "请检查是否有关于实现的接口" +
                    "EventExecutableInvokerListener,以及相关@DelayedQueueListener");
            return ;
        }
        log.info("启动与生产相关的侦听绑定机制");
        Map<String,List<EventExecutableInvokerListener>> getAnnotationMetadataGroup =
                delayedListenerContextMap.values().stream().collect(Collectors.groupingBy(DelayedBootstrapInitializer::getAnnotationMetadataGroupListener));

        log.info("开始初始化相关的异常信息处理机制");
        Map<String,List<DelayedExceptionHandler>> delayedExceptionHandlerMapGroup =
                delayedExceptionHandlerMap.values().stream().collect(Collectors.groupingBy(DelayedBootstrapInitializer::getAnnotationMetadataGroupExceptionHandler));
        if(MapUtils.isNotEmpty(getAnnotationMetadataGroup)){
            Executor executor = DelayedThreadPoolSupport.getTaskRecycleThread();
            log.info("启动资源分配机制");
            //推荐同一个组里面采用一个线程池进行处理机制
            getAnnotationMetadataGroup.entrySet().forEach(param->{
                log.info("初始化线程机制 {}:",param.getValue());
                executor.execute(new DelayedBootstrapRunnable(param.getKey(),param.getValue(),
                        DelayedBootstrapInitializer.getExecutorByGroup(param.getValue()),
                        new ExecutableExceptionHandler(delayedExceptionHandlerMapGroup.get(param.getKey()))));
            });
        }else{
            log.warn("资源转换失败!无法执行资源执行机制");
        }
    }

    /**
     * 支持动态添加延时队列控制
     */
//    public void addExecuteDelayeQueue(ExecuteDelayedQueue executeDelayedQueue){
//        Executor executor = DelayedThreadPoolSupport.getTaskRecycleThread();
//        executor.execute(new DelayedBootstrapRunnable(executeDelayedQueue.getQueueName(),param.getValue(),
//                DelayedBootstrapInitializer.getExecutorByGroup(executeDelayedQueue.getValue()),
//                new ExecutableExceptionHandler(Lists.newArrayList(new DefaultSampleDelayedExceptionHandler()))));
//    }

    /**
     * @param eventExecutableInvokerListener
     * @return
     */
    public static String getAnnotationMetadataGroupListener(EventExecutableInvokerListener eventExecutableInvokerListener){
        return getAnnotationMetadataGroup(eventExecutableInvokerListener,DelayedQueueListener.class);
    }


    public static String getAnnotationMetadataGroupExceptionHandler( DelayedExceptionHandler delayedExceptionHandler){
        return getAnnotationMetadataGroup(delayedExceptionHandler,DelayedQueueExceptionHandler.class);
    }

    /**
     * 获取相关的组信息
     * @param object
     * @return
     */
    public static String getAnnotationMetadataGroup(Object object,Class delayedQueueListenerClass){
        Object annotationInstance = object.getClass().getAnnotation(delayedQueueListenerClass);
        if(annotationInstance instanceof DelayedQueueListener) {
            DelayedQueueListener delayedQueueListener = (DelayedQueueListener)annotationInstance;
            if(Objects.isNull(annotationInstance)){
                return Strings.EMPTY;
            }else{
                return delayedQueueListener.group();
            }
        }
        else if(annotationInstance instanceof DelayedQueueExceptionHandler) {
            DelayedQueueExceptionHandler delayedExceptionHandler = (DelayedQueueExceptionHandler)annotationInstance;
            if(Objects.isNull(annotationInstance)){
                return Strings.EMPTY;
            }else{
                return delayedExceptionHandler.group();
            }
        }
        return Strings.EMPTY;
    }


    /**
     * 执行线程组机制
     * @return
     */
    public static Executor getExecutorByGroup(List<EventExecutableInvokerListener> eventExecutableInvokerListeners){
        return eventExecutableInvokerListeners.stream().map(EventExecutableInvokerListener::getExecutor).
                filter(Objects::nonNull).findAny().orElse(null);
    }
}

注入监听器+异常处理器

由于Spring框架可以帮我自动进行获取对象模型注入的数据集合,此部分我们采用的是Map


@Setter
@Getter
@DelayedQueueListener(value="delayedListenerContextMap")
Map<String, EventExecutableInvokerListener> delayedListenerContextMap = Maps.newHashMap();

@Setter
@Getter
@DelayedQueueExceptionHandler(value="delayedExceptionHandlerMap")
Map<String, DelayedExceptionHandler> delayedExceptionHandlerMap = Maps.newHashMap();

init初始化操作机制控制

主要是进行初始化操作init方法,之后进行获取对应的监听器以及对象,并将这些对象直接进行注入到对应的轮询线程和执行任务的线程中,方便我们整体的延时队列进行运行处理操作。

    public void init(){
        log.info("启动初始化加载并完成所有相关延迟启动初始化加载并完成所有相关延迟" +
                "系统中用于侦听上下文接口服务数据的队列 : {}",delayedListenerContextMap);
        log.info("开始完成线程任务分配,并为每个组的侦听器和任务队列分配资源");
        if(MapUtils.isEmpty(delayedListenerContextMap)){
            log.info("未找到任务侦听信息。在springcontext管理的上下文中," +
                    "请检查是否有关于实现的接口" +
                    "EventExecutableInvokerListener,以及相关@DelayedQueueListener");
            return ;
        }
        log.info("启动与生产相关的侦听绑定机制");
        Map<String,List<EventExecutableInvokerListener>> getAnnotationMetadataGroup =
                delayedListenerContextMap.values().stream().collect(Collectors.groupingBy(DelayedBootstrapInitializer::getAnnotationMetadataGroupListener));
        log.info("开始初始化相关的异常信息处理机制");
        Map<String,List<DelayedExceptionHandler>> delayedExceptionHandlerMapGroup =
                delayedExceptionHandlerMap.values().stream().collect(Collectors.groupingBy(DelayedBootstrapInitializer::getAnnotationMetadataGroupExceptionHandler));
        if(MapUtils.isNotEmpty(getAnnotationMetadataGroup)){
            Executor executor = DelayedThreadPoolSupport.getTaskRecycleThread();
            log.info("启动资源分配机制");
            //推荐同一个组里面采用一个线程池进行处理机制
            getAnnotationMetadataGroup.entrySet().forEach(param->{
                log.info("初始化线程机制 {}:",param.getValue());
                executor.execute(new DelayedBootstrapRunnable(param.getKey(),param.getValue(),
                        DelayedBootstrapInitializer.getExecutorByGroup(param.getValue()),
                        new ExecutableExceptionHandler(delayedExceptionHandlerMapGroup.get(param.getKey()))));
            });
        }else{
            log.warn("资源转换失败!无法执行资源执行机制");
        }
    }

获取延时队列数据信息的模型

主要通过延时队列处理类上面的注解的元数据信息,获取注解的分组Group操作属性。

  /**
     * @param eventExecutableInvokerListener
     * @return
     */
    public static String getAnnotationMetadataGroupListener(EventExecutableInvokerListener eventExecutableInvokerListener){
        return getAnnotationMetadataGroup(eventExecutableInvokerListener,DelayedQueueListener.class);
    }


    public static String getAnnotationMetadataGroupExceptionHandler( DelayedExceptionHandler delayedExceptionHandler){
        return getAnnotationMetadataGroup(delayedExceptionHandler,DelayedQueueExceptionHandler.class);
    }

    /**
     * 获取相关的组信息
     * @param object
     * @return
     */
    public static String getAnnotationMetadataGroup(Object object,Class delayedQueueListenerClass){
        Object annotationInstance = object.getClass().getAnnotation(delayedQueueListenerClass);
        if(annotationInstance instanceof DelayedQueueListener) {
            DelayedQueueListener delayedQueueListener = (DelayedQueueListener)annotationInstance;
            if(Objects.isNull(annotationInstance)){
                return Strings.EMPTY;
            }else{
                return delayedQueueListener.group();
            }
        }
        else if(annotationInstance instanceof DelayedQueueExceptionHandler) {
            DelayedQueueExceptionHandler delayedExceptionHandler = (DelayedQueueExceptionHandler)annotationInstance;
            if(Objects.isNull(annotationInstance)){
                return Strings.EMPTY;
            }else{
                return delayedExceptionHandler.group();
            }
        }
        return Strings.EMPTY;
    }

执行线程组机制

public static Executor getExecutorByGroup(List<EventExecutableInvokerListener> eventExecutableInvokerListeners){
     return eventExecutableInvokerListeners.stream().map(EventExecutableInvokerListener::getExecutor).
            filter(Objects::nonNull).findAny().orElse(null);
}

定义延时队列的消费接口

调用延时队列的执行抽象接口处理模型

@FunctionalInterface
public interface ExecutableInvokerListener<P,R>  {

    /**
     * 执行方法
     * @param param 返回值为以后callable使用
     * @return
     */
    R handle(P param);
}

定义延时队列的消费接口扩展接口

public interface EventExecutableInvokerListener<P,R> extends ExecutableInvokerListener<ExecuteInvokerEvent <P>,R> {

    /**
     * 延时偏移量
     */
    long DEFAULT_DELAYED_OSFFET = 10;

    /**
     * 延时超时时间时间戳
     */
    TimeUnit DEFAULT_DELAYED_TIMEUNIT = TimeUnit.SECONDS;

    /**
     * 是否可以执行异步操作(暂不支持)
     */
    boolean DEFAULT_IS_ASYNC_FLAG = Boolean.TRUE;

    /**
     * 暂时不支持重试机制,会造成数据重复执行机制,主要面向与执行失败后的重试机制(暂不支持)
     */
    int DEFAULT_RETRY_NUM = 0;

    /**
     * 存放在同一个线程执行
     */
    String DEFAULT_BIZ_GROUP = "DEFAULT_GROUP";

    /**
     * 如果没有定义直接采用默认线程池进行执行
     */
     Executor getExecutor();
}

DelayedBootstrapRunnable

主要用于处理对应的DelayedBootstrapRunnable的控制对象模型机制,用于轮询查询获取redis队列种的数据信息,之后回调给业务端的Listener监听器操作。

@RequiredArgsConstructor
@Slf4j
public  class DelayedBootstrapRunnable implements Runnable{

    /**
     * 直接传递相关的执行客户端访问器
     */
    public DelayedRedissionClientTool delayedRedissionClientTool = DelayedRedisClientSupport.getDelayedRedissionClientTool();
    /**
     * 绑定的线程组,只会执行相关的线程组之间的关系机制
     */
    public final String bizGroup;
    /**
     * 注入参数进入
     */
    public final List<EventExecutableInvokerListener> eventExecutableInvokerListeners;
    /**
     * 执行线程池
     */
    public final Executor executorThreadPool;
    /**
     * 异常信息控制
     */
    public final ExecutableExceptionHandler exceptionHandlers;
    /**
     * 启动服务处理机制
     */
    @Override
    public void run() {
        try {
            RBlockingQueue<ExecuteInvokerEvent> blockingQueue = delayedRedissionClientTool.takeBlockingQueue(new ExecuteInvokerEvent(bizGroup));
            Executor executor = Objects.isNull(executorThreadPool) ? DelayedThreadPoolSupport.getTaskExecuteThread() : executorThreadPool;
            Thread.currentThread().setUncaughtExceptionHandler(exceptionHandlers);
            for(;;) {
               try{
                   ExecuteInvokerEvent data =  delayedRedissionClientTool.poll(blockingQueue);
                   log.info("侦听队列任务组:{},获得值:{}", bizGroup, data);
                   log.info(MessageFormat.format("【1】Execute parse complete call: the execution time should be:{0,date,yyyy-MM-dd HH:mm:ss}," +
                                   "Actual execution time:{1,date,yyyy-MM-dd HH:mm:ss},createTime:{2,date,yyyy-MM-dd HH:mm:ss}",
                           data.getFiredTime(), new Date(),new Date(data.getCreateTime())));
                   executor.execute(() -> {
                       for(EventExecutableInvokerListener eventExecutableInvokerListener : eventExecutableInvokerListeners){
                           eventExecutableInvokerListener.handle(data);
                       }
                   });
               }catch (Exception e){
                   log.error("无法执行处理",e);
               }
            }
        } catch (Exception e) {
            log.error("无法执行处理",e);
//            throw new RuntimeException(e);
        }
    }
}

延时队列的使用案例

延时队列投递数据方


@Autowired(required = false)
public DelayedRedissionClientTool delayedRedissionClientTool;
public void testProducerElement(){
        AtomicInteger atomicInteger = new AtomicInteger();
        IntStream.range(0,200).forEach(param->{
            log.info("开始投递数据信息");
             // 业务编号必须传入,为了去重;此外分组必须穿,如同mq的topic
            ExecuteInvokerEvent executeInvokerEvent = new ExecuteInvokerEvent(String.valueOf(atomicInteger.incrementAndGet()),"TEST_GROUP");
            executeInvokerEvent2.setDelayedTime(10L); // 延时时长度
            executeInvokerEvent2.setDataModel("asdasda"); //传输数据模型。泛型类型
            executeInvokerEvent2.setTimeUnit(TimeUnit.SECONDS); // 延时时间单位
            delayedRedissionClientTool.offer(executeInvokerEvent); //数据存储
        });
    }

延时队列消费数据方:

@Slf4j
@DelayedQueueListener(value="delayedQueueTest",group="TEST_GROUP")
public class DelayedQueueTest implements EventExecutableInvokerListener<ExecuteInvokerEvent<Object>,Object> {
    /**
     * 可以自定义线程池,但是一个组中,只会采用其中一个线程池去执行,防止过多使用资源
     * @return
     */
    @Override
    public Executor getExecutor() {
        return null;
    }
    /**
     * 任务执行机制控制服务
     * @param param 返回值为以后callable使用
     * @return
     */
    @Override
    public Object handle(ExecuteInvokerEvent<ExecuteInvokerEvent<Object>> param) {
        try {
            System.out.println(MessageFormat.format("【1】执行解析完成调用:应该执行时间:{0,date,yyyy-MM-dd HH:mm:ss}," +
                    "实际执行时间:{1,date,yyyy-MM-dd HH:mm:ss},创建时间:{2,date,yyyy-MM-dd HH:mm:ss}",param.getFiredTime(), new Date(),new Date(param.getCreateTime())));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

延时轮询线程异常处理器:

 */
@DelayedQueueExceptionHandler(value="delayedHandler",group="TEST_GROUP")
public class DelayedTestQueueExceptionHandler implements DelayedExceptionHandler {
    @Override
    public void catchException(Throwable e, Thread currentThread) {
        System.out.println("asdasdasda---------------------");
//        e.printStackTrace();
    }
}

问题反馈

  1. 大家是不是觉得非常便利开发相关的延迟队列?
  2. 异常处理机制待优化
  3. 性能提升带优化
  4. 循环线程属于非常痛点和薄弱的问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/468407.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

排序算法 - 冒泡排序

文章目录 冒泡排序介绍冒泡排序实现复杂度和稳定性冒泡排序时间复杂度冒泡排序稳定性 代码实现核心&注意结尾 每日一道算法提高脑力&#xff0c;今天是第一天&#xff0c;来个最简单的算法–冒泡排序。 冒泡排序介绍 它是一种较简单的排序算法。它会遍历若干次要排序的数列…

jQuery知识点一

一、 jQuery 介绍 1.jQuery的概念&#xff1a; jQuery 是一个快速、简洁的 JavaScript 库&#xff0c;其设计的宗旨是“write Less&#xff0c;Do More”&#xff0c;即倡导写更少的代码&#xff0c;做更多的事情。 j 就是 JavaScript&#xff1b; Query 查询&#xff1b; 意思…

学习 Python 之 Pygame 开发魂斗罗(十五)

学习 Python 之 Pygame 开发魂斗罗&#xff08;十五&#xff09; 给魂斗罗游戏加入Boss1. 分析boss2. 创建boss类3. 在主类中加载Boss4. 修改子弹类逻辑&#xff0c;让boss可以开火5. 修改主类逻辑&#xff0c;让boss正常开火 给魂斗罗游戏加入Boss 在上次的博客学习 Python 之…

如何在不重装系统的情况下换固态硬盘?

随着固态硬盘的价格不断下降&#xff0c;越来越多的计算机用户希望用固态硬盘替换老旧的机械硬盘以获得更好的性能。 但是常规方法就避免不了重装系统&#xff0c;用户配置文件、系统设置、个人文件和已安装的程序又需要重新配置一遍。此外&#xff0c;还可能重新遇到很多问题…

城市“一网统管”平台—智慧平安小区的场景应用

随着城市建设进程的不断加快&#xff0c;关于城市的智能化治理需求也随之增多。在国家发布的“十四五”规划中&#xff0c;已经明确指出&#xff0c;推进新型城市建设&#xff0c;推行城市运行一网统管。作为推动城市治理体系和治理能力现代化的重要探索&#xff0c;“一网统管…

Linux/Unix常见IO模型

阻塞&#xff08;Blocking I/O&#xff09;、非阻塞&#xff08;Non-Blocking I/O&#xff09;、IO多路复用&#xff08;I/O Multiplexing&#xff09;、 信号驱动 I/O&#xff08;Signal Driven I/O&#xff09;&#xff08;不常用&#xff09;和异步&#xff08;Asynchronous…

智能家居“落地者”:三翼鸟用场景方案持续链接大众消费

互联网分析沙龙(techxue)原创 作者 &#xff5c; 锡海 编辑 &#xff5c; 七喜 从上海车展再到AWE2023展会&#xff0c;只要有大型活动的地方&#xff0c;都能看到人潮汹涌的景象&#xff0c;久违的烟火气又回来了。数据显示&#xff0c;社会消费已出现较为强劲反弹&#xff0…

长知识了,mongo的时间居然这个样子

1、前言 最近一直在使用mongo数据库&#xff0c;前面文章也介绍了一直在做数据过期的事情&#xff0c; mongo中的数据过期时间之前在程序中增加了一个字段 【Springboot系列】项目启动时怎么给mongo表加自动过期索引 之前看到时间字段没有时区的信息&#xff0c;没有关注&a…

微服务注册中心选型:Zookeeper、Eureka、Nacos、Consul和Etcd

注册中心基本概念 什么是注册中心&#xff1f; 注册中心主要有三种角色&#xff1a; 服务提供者&#xff08;RPC Server&#xff09;&#xff1a;在启动时&#xff0c;向 Registry 注册自身服务&#xff0c;并向 Registry 定期发送心跳汇报存活状态。 服务消费者&#xff08…

怎么知道网站服务器有没有被攻击?

​  一个网站服务器遭到攻击可能会给企业带来巨大的金融损失&#xff0c;因此&#xff0c;企业需要及时发现服务器是否被攻击。但是&#xff0c;企业如何知道自己的服务器是否被攻击呢?下面&#xff0c;我们来看一些服务器被攻击的警告信号。 1.网络延迟增加 在网络攻击中&a…

记一次运气非常好的渗透到服务器的经历

平平无奇的客服平台 这个客服平台是有RCE的&#xff0c;如果上传到的不是oss服务器&#xff0c;存储在本地服务器的话 在返回端口的url是存在st2 root权限&#xff0c;由于是客服后台服务器&#xff0c;没有啥有用价值的信息 直接替换私钥连服务器 继续翻找有用的信息 配置文件…

Django性能监视工具django-silk的使用

django-silk 是一个轻量级的 Django 应用性能监视工具&#xff0c;可帮助您了解 Django 应用的性能瓶颈、数据库查询等问题。它可以使用在django前后端分离的项目中&#xff0c;直接通过请求后台API接口即可对性能进行监视。以下是 django-silk 的使用步骤&#xff1a; 1.安装…

资本认可 | 开源网安成为中国未来独角兽企业,引领软件安全不断发展

4月11日&#xff0c;第七届万物生长大会中国未来独角兽大会盛大召开&#xff0c;本次大会中国投资发展促进会创投专委会联合微链共同发布了《2023中国未来独角兽TOP100榜单》&#xff0c;开源网安成功入选榜单。 《2023中国未来独角兽TOP100榜单》瞄准近两年融资较为活跃或融资…

快速简单制作macOS Ventura系统ISO格式镜像

ISO格式的镜像其实没有什么制作难度&#xff0c;下面苹果系统之家教大家怎么快速简单制作ISO格式的镜像&#xff0c;教程使用到的都是Mac官方的命令。制作好的ISO格式镜像可以用于虚拟机安装或者制作到U盘或者直接在Mac里面打开安装升级。 准备系统镜像 首先下载好macOS 镜像…

上海亚商投顾:沪指延续反弹涨0.67% AI概念股掀跌停潮

指数今日低开高走&#xff0c;沪指午后一度涨超1%&#xff0c;以保险为首的大金融板块拉升&#xff0c;中国平安在一季报驱动下&#xff0c;迎来久违涨停&#xff0c;成交超120亿元。医药股全天强势&#xff0c;何氏眼科、金石亚药、普蕊斯、天宇股份20CM涨停&#xff0c;第一医…

约翰霍普金斯大学诺奖得主涉嫌造假,撤回5篇PNAS论文

2019年&#xff0c;约翰霍普金斯大学的著名基因医学科学家Gregg L. Semenza博士因为“发现细胞如何感知和适应氧气供应”&#xff0c;和另外两名科学家&#xff08; William Kaelin Jr. and Peter J. Ratcliffe&#xff09;分享当年的生理医学诺贝尔奖。 近期&#xff0c;Gregg…

SpringBoot整合WebSocket详细教程

预期效果 共开启两个页面&#xff0c;实现一对一聊天。 服务端代码&#xff1a;https://gitee.com/lianaozhe/springboot-websocket.git 代码实现逻辑 服务端 导入相关依赖&#xff1a; <dependency><groupId>org.springframework.boot</groupId><art…

Silane-PEG-FITC 硅烷聚乙二醇荧光素FITC-PEG-Silane在医疗设备领域有广泛应用,具有很好的生物相容性。

FITC-PEG-Silane&#xff0c;荧光素-聚乙二醇-硅烷 中文名称&#xff1a;荧光素-聚乙二醇-硅烷 英文名称&#xff1a;FITC-PEG-Silane 存储条件&#xff1a;-20C&#xff0c;避光&#xff0c;避湿 性状 :白色、微黄色粉末或固体&#xff0c;取决于分子量 溶剂&#xff1a;…

新版android studio gradle插件7.4.2.pom一直无法下载问题

android studio同步时候出现org.gradle.api.plugins.UnknownPluginException&#xff0c;Plugin [id: com.android.application, version: 7.4.2] was not found in any of the following sources: pom插件一直无法下载&#xff0c;搞了好几天&#xff0c;简直想砸电脑&#x…

USB转串口芯片CH340与CH341使用问题汇总

USB转串口&#xff1a; CH340C/N/K/E/B/G/T CH341F/B/C/T/A USB转打印口&#xff1a; CH340H/S CH341F/B/A USB转I2C&#xff1a;CH341F/B/C/T/A USB转SPI&#xff1a;CH341F/B/A/H USB转并口/GPIO&#xff1a;CH341F/B/A 芯片供电注意事项 CH340/CH341等 USB 芯片都支…