主要是记录一下实现过程和实现的过程中遇到的坑。
我的业务
系统中有一个接口,是从大数据那边拉数据,之前的做法是,开个线程池,让SQL去执行,可是如果大量的慢SQL同时,请求数据库的话会适得其反。并且还有一个问题,就是数据库连接池的连接数是有限的,当慢查询把连接都占用了的话,其他的快查询就会获取不到连接而等待超时。
解决方案
方案一
给慢查询一个单独的连接池,控制连接个数
● 缺点:这样虽然解决了,慢查询阻塞快查询,但是,对于用户体验不好,可能某一个用户就将慢查询队列占满了,后续的其它用户根本查询不了。
方案二
识别慢查询,针对每个用户的慢查询,加一个分布式锁
● 缺点:这样确实能避免一个用户占满慢队列,可是,每次一个用户只能执行一个慢查询,后续如果想增加用户同时执行慢查询的数量将非常困难。
方案三
识别慢查询,给每个用户创建一个队列,每个用户的慢查询单独排队。
最终解决方案和实现
最终选择了方案三,虽然可能也不是最优的解决方案,但是考虑到我们的系统数据TO B系统,用户量不会太大。
自定义任务类
这里有个坑,我后面再说
package com.t4f.bi.data.insights.server.manager.gamedata.query.queue;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* 队列查询任务
*
* @author limingyang1
* @date 2023/7/3
* @version 0.0.1
*/
@Slf4j
@Getter
@Setter
public class QueryQueueTask implements Callable<String> {
private String taskId;
private Supplier<String> task;
public QueryQueueTask(String id, Supplier<String> task) {
this.taskId = id;
this.task = task;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
QueryQueueTask that = (QueryQueueTask) o;
return Objects.equals(taskId, that.taskId);
}
@Override
public int hashCode() {
return Objects.hash(taskId);
}
@Override
public String call() throws Exception {
try{
return task.get();
} catch (Exception e) {
log.info("QueryQueueTask call error", e);
return null;
}
}
}
用户慢查询队列
这里之所以加锁,是防止多线程同事修改USER_TASK_QUEUE队列,我这里用的Java的程序锁是因为我们这个服务部署只有一台机器,如果线上是多台的话,需要改为分布式锁。
package com.t4f.bi.data.insights.server.manager.gamedata.query.queue;
import cn.hutool.core.util.ObjectUtil;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* 慢查询队列
*
* @author limingyang1
* @date 2023/10/20
* @version 0.0.1
*/
@Slf4j
@Getter
@Setter
public class UserSlowQueryQueue {
public static final Map<String, BlockingQueue<QueryQueueTask>> USER_TASK_QUEUE = new ConcurrentHashMap<>();
private static final Map<String, ReentrantLock> USER_LOCKS = new ConcurrentHashMap<>();
public static boolean add(String user, QueryQueueTask task) {
log.info(">> 用户:{} 添加慢查询任务:{}", user, task.getTaskId());
ReentrantLock userLock = USER_LOCKS.computeIfAbsent(user, key -> new ReentrantLock());
userLock.lock();
try {
BlockingQueue<QueryQueueTask> userTaskQueue =
USER_TASK_QUEUE.computeIfAbsent(user, key -> new LinkedBlockingQueue<>());
if (userTaskQueue.contains(task)) {
log.info(">> 用户:{} 慢查询任务:{} 已经存在", user, task.getTaskId());
return true;
}
userTaskQueue.add(task);
USER_TASK_QUEUE.put(user, userTaskQueue);
} finally {
userLock.unlock();
}
return true;
}
public static boolean contains(String user, QueryQueueTask task) {
ReentrantLock userLock = USER_LOCKS.computeIfAbsent(user, key -> new ReentrantLock());
userLock.lock();
try {
BlockingQueue<QueryQueueTask> userTaskQueue =
USER_TASK_QUEUE.computeIfAbsent(user, key -> new LinkedBlockingQueue<>());
return userTaskQueue.contains(task);
} finally {
userLock.unlock();
}
}
public static void remove(String user, String taskId) {
log.info(">> 用户:{} 移除慢查询任务:{}", user, taskId);
ReentrantLock userLock = USER_LOCKS.computeIfAbsent(user, key -> new ReentrantLock());
userLock.lock();
try {
BlockingQueue<QueryQueueTask> userQueue = USER_TASK_QUEUE.get(user);
if (ObjectUtil.isNotNull(userQueue) && !userQueue.isEmpty()) {
userQueue.removeIf(task -> task.getTaskId().equals(taskId));
}
} finally {
userLock.unlock();
}
}
}
慢查询队列消费
package com.t4f.bi.data.insights.server.listener;
import static com.t4f.bi.data.insights.server.consts.Consts.REQUEST_ID_KEY;
import cn.hutool.core.util.IdUtil;
import com.t4f.bi.data.insights.server.config.MdcTaskDecorator;
import com.t4f.bi.data.insights.server.context.RequestContext;
import com.t4f.bi.data.insights.server.manager.gamedata.query.queue.QueryQueueTask;
import com.t4f.bi.data.insights.server.manager.gamedata.query.queue.UserSlowQueryQueue;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
/**
* 队列监听
*
* @author limingyang1
* @date 2023/9/19
* @version 0.0.1
*/
@Slf4j
@Component
public class QueueListener implements InitializingBean {
private final Map<String, Executor> USER_EXECUTOR = new ConcurrentHashMap<>();
@Override
public void afterPropertiesSet() throws Exception {
// 启动队列消费线程
startQueueListener();
}
private void startQueueListener() {
log.info(">> 用户慢查询队列消费线程监听中");
// 启动队列消费线程
CompletableFuture.runAsync(
() -> {
// 循环监听每个数据源的快慢队列
while (true) {
try {
// 休眠500ms
TimeUnit.MILLISECONDS.sleep(500);
Map<String, BlockingQueue<QueryQueueTask>> userTaskQueue =
UserSlowQueryQueue.USER_TASK_QUEUE;
if (userTaskQueue.isEmpty()) {
continue;
}
userTaskQueue.forEach(
(user, queue) -> {
// 已经存在消费线程则不再创建
Executor executor = getOrCreateExecutor(user);
startConsuming(user, queue, executor);
});
} catch (Exception e) {
log.error("Queue listener error", e);
}
}
},
startListenerExecutor("queue-listener"));
}
private Executor getOrCreateExecutor(String user) {
return USER_EXECUTOR.computeIfAbsent(user, this::startExecutor);
}
private void startConsuming(
String user, BlockingQueue<QueryQueueTask> queue, Executor executor) {
QueryQueueTask task = queue.poll();
if (task == null) {
return;
}
CompletableFuture.runAsync(
() -> {
String requestId = task.getRequestId();
MDC.put(REQUEST_ID_KEY, requestId);
RequestContext.initContext();
try {
RequestContext.putValue(REQUEST_ID_KEY, requestId);
log.info(">> 用户:{} 慢查询: {} 慢查询开始异步执行", user, queue.size());
String taskId = task.call();
log.info(">> 用户:{} 慢查询: {} 执行完毕", user, taskId);
} catch (Exception e) {
// handle exception
log.error("Queue execute error", e);
} finally{
RequestContext.clearContext();
}
},
executor);
}
public Executor startExecutor(String name) {
log.info(">> 创建用户: {} 的消费处理线程池", name);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setTaskDecorator(new MdcTaskDecorator());
// 这里控制同事执行任务的个数
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setKeepAliveSeconds(60);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("[" + name + "]query-queue-run-task");
// 拒绝策略:丢弃队列中最老的任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.initialize();
return executor;
}
public Executor startListenerExecutor(String name) {
log.info(">> 创建用户的消费监听线程池");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setTaskDecorator(new MdcTaskDecorator());
executor.setCorePoolSize(1);
executor.setDaemon(true);
executor.setMaxPoolSize(1);
executor.setThreadNamePrefix("[" + name + "]-run-task");
// 拒绝策略:丢弃队列中最老的任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.initialize();
return executor;
}
}
如何使用
QueryQueueTask task =
new QueryQueueTask(
id,
() -> {
// TODO: 要执行的业务
return taskId;
});
UserSlowQueryQueue.add(taskId, task);
遇到的坑
-
while循环监听的时候,一定要记得设置休眠,不然会让CPU飙升
-
在自定义任务类时,一定要实现Callable接口,而不是Runnable,因为我们在后面取出队列中的任务执行时,是希望它执行完成再释放线程,实现Runnable的话,到时候就是task.run() 方法,并不会阻塞到任务执行完毕,起不到排队的作用。实现Callable接口,则调用task.call()等待执行完成,达到慢查询排队的效果
-
任务监听程序中,在往消费线程添加任务时,一定要确保任务的有效性,不然会使线程池队列占满,触发拒绝策略,因为你相当于循环的往线程池丢无效的任务,看下面代码的区别
// 错误的方式 private void startConsuming( String user, BlockingQueue<QueryQueueTask> queue, Executor executor) { CompletableFuture.runAsync( () -> { QueryQueueTask task = queue.poll(); // 判断任务是否有效 if (task == null) { return; } String requestId = task.getRequestId(); MDC.put(REQUEST_ID_KEY, requestId); RequestContext.initContext(); try { RequestContext.putValue(REQUEST_ID_KEY, requestId); log.info(">> 用户:{} 慢查询: {} 慢查询开始异步执行", user, queue.size()); String taskId = task.call(); log.info(">> 用户:{} 慢查询: {} 执行完毕", user, taskId); } catch (Exception e) { // handle exception log.error("Queue execute error", e); } finally{ RequestContext.clearContext(); } }, executor); }
// 正确的方式
private void startConsuming(
String user, BlockingQueue<QueryQueueTask> queue, Executor executor) {
QueryQueueTask task = queue.poll();
// 判断任务是否有效
if (task == null) {
return;
}
CompletableFuture.runAsync(
() -> {
String requestId = task.getRequestId();
MDC.put(REQUEST_ID_KEY, requestId);
RequestContext.initContext();
try {
RequestContext.putValue(REQUEST_ID_KEY, requestId);
log.info(">> 用户:{} 慢查询: {} 慢查询开始异步执行", user, queue.size());
String taskId = task.call();
log.info(">> 用户:{} 慢查询: {} 执行完毕", user, taskId);
} catch (Exception e) {
// handle exception
log.error("Queue execute error", e);
} finally{
RequestContext.clearContext();
}
},
executor);
}