【Java】记一次服务内实现排队消费模式

news2024/9/20 12:39:01

主要是记录一下实现过程和实现的过程中遇到的坑。

我的业务

系统中有一个接口,是从大数据那边拉数据,之前的做法是,开个线程池,让SQL去执行,可是如果大量的慢SQL同时,请求数据库的话会适得其反。并且还有一个问题,就是数据库连接池的连接数是有限的,当慢查询把连接都占用了的话,其他的快查询就会获取不到连接而等待超时。

解决方案

方案一

给慢查询一个单独的连接池,控制连接个数
● 缺点:这样虽然解决了,慢查询阻塞快查询,但是,对于用户体验不好,可能某一个用户就将慢查询队列占满了,后续的其它用户根本查询不了。

方案二

识别慢查询,针对每个用户的慢查询,加一个分布式锁
● 缺点:这样确实能避免一个用户占满慢队列,可是,每次一个用户只能执行一个慢查询,后续如果想增加用户同时执行慢查询的数量将非常困难。

方案三

识别慢查询,给每个用户创建一个队列,每个用户的慢查询单独排队。

最终解决方案和实现

最终选择了方案三,虽然可能也不是最优的解决方案,但是考虑到我们的系统数据TO B系统,用户量不会太大。

自定义任务类

这里有个坑,我后面再说

package com.t4f.bi.data.insights.server.manager.gamedata.query.queue;

import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

/**
 * 队列查询任务
 *
 * @author limingyang1
 * @date 2023/7/3
 * @version 0.0.1
 */
@Slf4j
@Getter
@Setter
public class QueryQueueTask implements Callable<String> {

    private String taskId;
    private Supplier<String> task;

    public QueryQueueTask(String id, Supplier<String> task) {
        this.taskId = id;
        this.task = task;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        QueryQueueTask that = (QueryQueueTask) o;
        return Objects.equals(taskId, that.taskId);
    }

    @Override
    public int hashCode() {
        return Objects.hash(taskId);
    }

    @Override
    public String call() throws Exception {
        try{
            return task.get();
        } catch (Exception e) {
            log.info("QueryQueueTask call error", e);
            return null;
        }
    }
}
用户慢查询队列

这里之所以加锁,是防止多线程同事修改USER_TASK_QUEUE队列,我这里用的Java的程序锁是因为我们这个服务部署只有一台机器,如果线上是多台的话,需要改为分布式锁。

package com.t4f.bi.data.insights.server.manager.gamedata.query.queue;

import cn.hutool.core.util.ObjectUtil;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

/**
 * 慢查询队列
 *
 * @author limingyang1
 * @date 2023/10/20
 * @version 0.0.1
 */
@Slf4j
@Getter
@Setter
public class UserSlowQueryQueue {

    public static final Map<String, BlockingQueue<QueryQueueTask>> USER_TASK_QUEUE = new ConcurrentHashMap<>();
    private static final Map<String, ReentrantLock> USER_LOCKS = new ConcurrentHashMap<>();

    public static boolean add(String user, QueryQueueTask task) {
        log.info(">> 用户:{} 添加慢查询任务:{}", user, task.getTaskId());
        ReentrantLock userLock = USER_LOCKS.computeIfAbsent(user, key -> new ReentrantLock());
        userLock.lock();
        try {
            BlockingQueue<QueryQueueTask> userTaskQueue =
                    USER_TASK_QUEUE.computeIfAbsent(user, key -> new LinkedBlockingQueue<>());
            if (userTaskQueue.contains(task)) {
                log.info(">> 用户:{} 慢查询任务:{} 已经存在", user, task.getTaskId());
                return true;
            }
            userTaskQueue.add(task);
            USER_TASK_QUEUE.put(user, userTaskQueue);
        } finally {
            userLock.unlock();
        }
        return true;
    }


    public static boolean contains(String user, QueryQueueTask task) {
        ReentrantLock userLock = USER_LOCKS.computeIfAbsent(user, key -> new ReentrantLock());
        userLock.lock();
        try {
            BlockingQueue<QueryQueueTask> userTaskQueue =
                USER_TASK_QUEUE.computeIfAbsent(user, key -> new LinkedBlockingQueue<>());
                return userTaskQueue.contains(task);
        } finally {
            userLock.unlock();
        }
    }

    public static void remove(String user, String taskId) {
        log.info(">> 用户:{} 移除慢查询任务:{}", user, taskId);
        ReentrantLock userLock = USER_LOCKS.computeIfAbsent(user, key -> new ReentrantLock());
        userLock.lock();
        try {
            BlockingQueue<QueryQueueTask> userQueue = USER_TASK_QUEUE.get(user);
            if (ObjectUtil.isNotNull(userQueue) && !userQueue.isEmpty()) {
                userQueue.removeIf(task -> task.getTaskId().equals(taskId));
            }
        } finally {
            userLock.unlock();
        }
    }
}
慢查询队列消费
package com.t4f.bi.data.insights.server.listener;

import static com.t4f.bi.data.insights.server.consts.Consts.REQUEST_ID_KEY;

import cn.hutool.core.util.IdUtil;
import com.t4f.bi.data.insights.server.config.MdcTaskDecorator;
import com.t4f.bi.data.insights.server.context.RequestContext;
import com.t4f.bi.data.insights.server.manager.gamedata.query.queue.QueryQueueTask;
import com.t4f.bi.data.insights.server.manager.gamedata.query.queue.UserSlowQueryQueue;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

/**
 * 队列监听
 *
 * @author limingyang1
 * @date 2023/9/19
 * @version 0.0.1
 */
@Slf4j
@Component
public class QueueListener implements InitializingBean {

    private final Map<String, Executor> USER_EXECUTOR = new ConcurrentHashMap<>();

    @Override
    public void afterPropertiesSet() throws Exception {
        // 启动队列消费线程
        startQueueListener();
    }

    private void startQueueListener() {
        log.info(">> 用户慢查询队列消费线程监听中");
        // 启动队列消费线程
        CompletableFuture.runAsync(
                () -> {
                    // 循环监听每个数据源的快慢队列
                    while (true) {
                        try {
                            // 休眠500ms
                            TimeUnit.MILLISECONDS.sleep(500);
                            Map<String, BlockingQueue<QueryQueueTask>> userTaskQueue =
                                    UserSlowQueryQueue.USER_TASK_QUEUE;
                            if (userTaskQueue.isEmpty()) {
                                continue;
                            }

                            userTaskQueue.forEach(
                                    (user, queue) -> {
                                        // 已经存在消费线程则不再创建
                                        Executor executor = getOrCreateExecutor(user);
                                        startConsuming(user, queue, executor);
                                    });

                        } catch (Exception e) {
                            log.error("Queue listener error", e);
                        }
                    }
                },
                startListenerExecutor("queue-listener"));
    }

    private Executor getOrCreateExecutor(String user) {
        return USER_EXECUTOR.computeIfAbsent(user, this::startExecutor);
    }

    private void startConsuming(
            String user, BlockingQueue<QueryQueueTask> queue, Executor executor) {
        QueryQueueTask task = queue.poll();
        if (task == null) {
            return;
        }
        CompletableFuture.runAsync(
                () -> {
                    String requestId = task.getRequestId();
                    MDC.put(REQUEST_ID_KEY, requestId);
                    RequestContext.initContext();
                    try {
                        RequestContext.putValue(REQUEST_ID_KEY, requestId);
                        log.info(">> 用户:{} 慢查询: {} 慢查询开始异步执行", user, queue.size());
                        String taskId = task.call();
                        log.info(">> 用户:{} 慢查询: {} 执行完毕", user, taskId);
                    } catch (Exception e) {
                        // handle exception
                        log.error("Queue execute error", e);
                    } finally{
                        RequestContext.clearContext();
                    }
                },
                executor);
    }

    public Executor startExecutor(String name) {
        log.info(">> 创建用户: {} 的消费处理线程池", name);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setTaskDecorator(new MdcTaskDecorator());
        // 这里控制同事执行任务的个数
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setKeepAliveSeconds(60);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("[" + name + "]query-queue-run-task");
        // 拒绝策略:丢弃队列中最老的任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.initialize();
        return executor;
    }

    public Executor startListenerExecutor(String name) {
        log.info(">> 创建用户的消费监听线程池");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setTaskDecorator(new MdcTaskDecorator());
        executor.setCorePoolSize(1);
        executor.setDaemon(true);
        executor.setMaxPoolSize(1);
        executor.setThreadNamePrefix("[" + name + "]-run-task");
        // 拒绝策略:丢弃队列中最老的任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.initialize();
        return executor;
    }
}
如何使用
QueryQueueTask task =
        new QueryQueueTask(
                id,
                () -> {
                    // TODO: 要执行的业务
        
                    return taskId;
                });
UserSlowQueryQueue.add(taskId, task);

遇到的坑

  1. while循环监听的时候,一定要记得设置休眠,不然会让CPU飙升
    在这里插入图片描述

  2. 在自定义任务类时,一定要实现Callable接口,而不是Runnable,因为我们在后面取出队列中的任务执行时,是希望它执行完成再释放线程,实现Runnable的话,到时候就是task.run() 方法,并不会阻塞到任务执行完毕,起不到排队的作用。实现Callable接口,则调用task.call()等待执行完成,达到慢查询排队的效果
    在这里插入图片描述
    在这里插入图片描述

  3. 任务监听程序中,在往消费线程添加任务时,一定要确保任务的有效性,不然会使线程池队列占满,触发拒绝策略,因为你相当于循环的往线程池丢无效的任务,看下面代码的区别

    // 错误的方式
    private void startConsuming(
            String user, BlockingQueue<QueryQueueTask> queue, Executor executor) {
        CompletableFuture.runAsync(
                () -> {
                    QueryQueueTask task = queue.poll();
                    // 判断任务是否有效
                    if (task == null) {
                        return;
                    }
                    String requestId = task.getRequestId();
                    MDC.put(REQUEST_ID_KEY, requestId);
                    RequestContext.initContext();
                    try {
                        RequestContext.putValue(REQUEST_ID_KEY, requestId);
                        log.info(">> 用户:{} 慢查询: {} 慢查询开始异步执行", user, queue.size());
                        String taskId = task.call();
                        log.info(">> 用户:{} 慢查询: {} 执行完毕", user, taskId);
                    } catch (Exception e) {
                        // handle exception
                        log.error("Queue execute error", e);
                    } finally{
                        RequestContext.clearContext();
                    }
                },
                executor);
    }
    
// 正确的方式
    private void startConsuming(
            String user, BlockingQueue<QueryQueueTask> queue, Executor executor) {
        QueryQueueTask task = queue.poll();
        // 判断任务是否有效
        if (task == null) {
            return;
        }
        CompletableFuture.runAsync(
                () -> {
                    String requestId = task.getRequestId();
                    MDC.put(REQUEST_ID_KEY, requestId);
                    RequestContext.initContext();
                    try {
                        RequestContext.putValue(REQUEST_ID_KEY, requestId);
                        log.info(">> 用户:{} 慢查询: {} 慢查询开始异步执行", user, queue.size());
                        String taskId = task.call();
                        log.info(">> 用户:{} 慢查询: {} 执行完毕", user, taskId);
                    } catch (Exception e) {
                        // handle exception
                        log.error("Queue execute error", e);
                    } finally{
                        RequestContext.clearContext();
                    }
                },
                executor);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1194003.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python语法基础(字符串 列表 元组 字典 集合)

目录 字符串(str)字符串的创建特殊情况字符串的转义字符字符串的运算符字符串常用方法求字符串长度去掉多余空格是否包含某子串分割字符串合并字符串替换字符串统计统计字符串出现的次数 练习&#xff1a;判断字符串是否为回文串 列表(list)列表的创建列表常用方法遍历列表列表…

小程序如何设置下单提示语句

下单提示会展示在购物车和提交订单页面&#xff0c;它可以帮助商家告知客户事项&#xff0c;提高用户体验和减少错误操作。例如提示&#xff1a;商品是否包邮、某些区域是否发货、商品送达时间等等。 在小程序管理员后台->配送设置处&#xff0c;填写下单提示。在设置下单提…

基于ssm的高校失物招领管理系统

基于ssm的高校失物招领管理系统 摘要 失物招领管理系统是一种利用现代信息技术&#xff0c;为高校提供高效、便捷的失物招领服务的平台。本系统基于SSM框架&#xff08;Spring SpringMVC MyBatis&#xff09;&#xff0c;充分利用了各框架的优势&#xff0c;实现了系统的稳定…

1.微服务与SpringCloud

微服务和SpringCloud 文章目录 微服务和SpringCloud1.什么是微服务2.SpringCloud3. 微服务 VS SpringCloud4. SpringCloud 组件5.参考文档6.版本要求 1.什么是微服务 微服务是将一个大型的、单一的应用程序拆分成多个小型服务&#xff0c;每个服务实现特定的业务功能&#xff…

C#上位机序列10: Winform上位机通用框架

C#上位机序列1: 多线程&#xff08;线程同步&#xff0c;事件触发&#xff0c;信号量&#xff0c;互斥锁&#xff0c;共享内存&#xff0c;消息队列&#xff09; C#上位机序列2: 同步异步(async、await) C#上位机序列3: 流程控制&#xff08;串行&#xff0c;并行&#xff0c…

防火防盗防小人 使用 Jasypt 库来加密配置文件

⚔️ 项目配置信息存放在哪&#xff1f; 在日常开发工作中&#xff0c;我们经常需要使用到各种敏感配置&#xff0c;如数据库密码、各厂商的 SecretId、SecretKey 等敏感信息。 通常情况下&#xff0c;我们会将这些敏感信息明文放到配置文件中&#xff0c;或者放到配置中心中。…

原厂监视综合控制继电器 ZZS-7/1 AC220V 凸出端子固定安装

ZZS-7/11分闸、合闸、电源监视综合控制装置&#xff1b; ZZS-7/12分闸、合闸、电源监视综合控制装置&#xff1b; ZZS-7/13分闸、合闸、电源监视综合控制装置&#xff1b; ZZS-7/14分闸、合闸、电源监视综合控制装置&#xff1b; ZZS-7/102分闸、合闸、电源监视综合控制装置…

GIT的安装与常见命令

Git的介绍 Git是一个开源的分布式版本控制系统&#xff0c;最初由Linus Torvalds在2005年创建用于管理Linux内核的开发&#xff0c;现在已成为全球最流行的版本控制工具之一。 Git可以跟踪代码的修改&#xff0c;记录开发历程&#xff0c;保证多人合作开发时代码的一致性&…

关于maven读取settings.xml文件的优先级问题

今天在IDEA中配置maven的setting.xml文件路径指向的.m2路径下的setting_a.xml文件&#xff0c;同时&#xff0c;我的maven3.6.3也放在.m2中。 [1] .m2文件夹 [2] apache-maven-3.6.3文件夹 然后&#xff0c;在IDEA中打包发布时发现&#xff0c;无论如何都读取不到指定的setti…

【Linux】Linux常用命令—磁盘管理、压缩包管理

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; &#x1f525;c系列专栏&#xff1a;C/C零基础到精通 &#x1f525; 给大…

猫罐头哪家好?宠物店自用的5款猫罐头推荐!猫咪嘎嘎炫~

亲爱的铲屎官们&#xff0c;你们是否会为猫咪选购猫罐头而感到烦恼&#xff1f;你们是否渴望了解哪些猫罐头在宠物界有着良好的口碑&#xff1f;猫罐头&#xff0c;作为猫咪日常饮食中的重要组成部分&#xff0c;其品质直接影响到猫咪的健康和幸福。 猫罐头哪家好&#xff1f;作…

Vue的vant notify组件报错Notify is not defined

解决方法&#xff1a; 原创作者&#xff1a;吴小糖 创作时间&#xff1a;2023.11.10

Java自学第8课:电商项目(3) - 重新搭建环境

由于之前用的jdk和eclipse&#xff0c;以及mysql并不是视频教程所采用的&#xff0c;在后面运行源码和使用作者提供源码时&#xff0c;总是报错&#xff0c;怀疑&#xff1a; 1 数据库有问题 2 jdk和引入的jar包不匹配 3 其他什么未知的错误&#xff1f; 所以决定卸载jdk e…

Unity 一些内置宏定义

在Unity中&#xff0c;有一些内置的宏定义可用于不同的平台。以下是一些常见的平台内置宏定义&#xff1a; 1、UNITY_EDITOR&#xff1a;在Unity编辑器中运行。 2、UNITY_EDITOR_WIN&#xff1a;在Unity编辑器运行在Windows操作系统时被定义。 3、UNITY_STANDALONE&#xff1a…

QT QDockWidget

QDockWidget是Qt中的一个容器类&#xff0c;用于在主窗口上创建可停靠的子窗口。 设置停靠窗口的一般流程如下: (1)创建一个QDockWidget 对象的停靠窗体。 (2)设置此停靠窗体的属性&#xff0c;通常调用setFeatures()及setAllowedAreas()两种方法。 (3)新建一个要插入停靠窗…

MATLAB仿真通信系统的眼图

eyediagram eyediagram(complex(used_i,used_q),1100)

Apex R5在线粒子计数器 制药企业在线粒子实时监测系统解决方案

医疗保健生产设施的质量和校准面临的一个令人沮丧的问题是&#xff0c;在校准时发现仪器超出公差或损坏。这需要耗时且成本高昂的调查&#xff0c;这可能会影响到产品。由于空气中颗粒物计数器是世界各地制药、生物制药和医疗保健设施环境监测中使用的重要工具&#xff0c;因此…

【Spring】SpringBoot配置文件

SpringBoot配置文件 配置文件作用SpringBoot配置文件配置文件快速入手配置文件的格式properties配置文件说明基本语法读取配置文件properties缺点分析 yml配置文件说明yml基本语法yml使用进阶yml配置读取配置对象配置集合配置Mapyml优缺点 配置文件作用 计算机上有数以千计的配…

Selenium+Python自动化测试环境搭建

selenium python 自动化测试 —— 环境搭建 关于 selenium Selenium 是一个用于Web应用程序测试的工具。Selenium测试直接运行在浏览器中&#xff0c;就像真正的用户在操作一样。支持的浏览器包括IE(7、8、9)、Mozilla Firefox、Mozilla Suite等。 Selenium 框架底层使用JavaS…

Css问题:推荐几个超好看渐变色!项目中可用

前端功能问题系列文章&#xff0c;点击上方合集↑ 序言 大家好&#xff0c;我是大澈&#xff01; 本文约2000字&#xff0c;整篇阅读大约需要3分钟。 本文主要内容分三部分&#xff0c;第一部分是需求分析&#xff0c;第二部分是实现步骤&#xff0c;第三部分是问题详解。 …