一、需求背景
当前项目中遇到这样一个需求: 将需要审核的文本提交给人工智能模型接口审核,等待模型接口审核完毕以后拿到审核结果返回给前端展示给用户(另:模型处理数据所消耗的时间会随着用户提交数据的复杂度有所变化)。
以上需求的重点是 等待 ,如果有多个用户在同一时间提交了文本审核需求,且恰好此时此刻模型需要较长时间处理,这样的话,按照通常的代码写法,是无法满足任务需求的,那么应该如何满足这个需求呢?此时此刻,我们可以使用任务队列
来满足这个需求:
二、任务队列的具体实现
1、 代码结构
2、具体实现
注: 一下代码中涉及到公司名或公司简称的包名会使用xxx代替
-
自定义任务接口
ITask.java
package com.xxx.queue.service; import com.xxx.vo.XXXVO; import java.util.List; public interface ITask { /** * 自定义任务内容 */ List<XXXVO> getCorrectByModel() ; }
-
任务接口的实现:
RequestModeTask.java
package com.xxx.queue.service.impl; /** * 请求模型执行任务 * * @author wxz */ public class RequestModelTask implements ITask { /** * 请求模型的内容 */ private final List<String> contexts; /** * JIRA_ID: 请求模型所需要的JIRA_ID */ private final String JIRA_ID; /** * MODEL_URL: 模型地址 */ private final String MODEL_URL; //请求方法需要的参数 public RequestModelTask( List<String> contexts, String jira_id, String model_url) { this.contexts = contexts; JIRA_ID = jira_id; MODEL_URL = model_url; } /** * 调用模型接口获取修改建议 * * @return List<XXXVO> */ @Override public List<XXXVO> getCorrectByModel() { // 以下为具体的执行方法,可以根据需求写自己的方法 } }
-
执行任务的执行体
TaskExecutor.java
package com.xxx.queue; import com.xxx.queue.service.ITask; import java.util.concurrent.BlockingQueue; /** * 处理任务的窗口 * * @author wxz */ public class TaskExecutor extends Thread { /** * 执行体队列 */ private final BlockingQueue<ITask> taskQueue; /** * 窗口的当前处理事务状态 初始化:窗口工作状态开启 */ private boolean isRunning = true; public TaskExecutor(BlockingQueue<ITask> taskQueue) { this.taskQueue = taskQueue; } /** * 窗口工作状态关闭 */ public void quit() { isRunning = false; interrupt(); } @Override public void run() { // 窗口工作开启状态时 等待处理事务 while (isRunning) { ITask iTask; try { //任务执行体进来 如果没有时间 继续等待处理事务 iTask = taskQueue.take(); iTask.getCorrectByModel(); } catch (InterruptedException e) { if (!isRunning) { interrupt(); break; } } } } }
-
任务队列
RequestModelTaskQueue.java
package com.xxx.queue; import com.xxx.queue.service.ITask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** * 请求模型的任务队列 */ public class RequestModelTaskQueue { /** * 排队办事的执行体 */ private final BlockingQueue<ITask> taskQueue; /** * 处理执行体的多个窗口 */ public TaskExecutor[] mTaskExecutors; /** * 创建队列的时候 设定窗口数量 * * @param size 窗口数量 */ public RequestModelTaskQueue( int size) { taskQueue = new LinkedBlockingQueue<>(); mTaskExecutors = new TaskExecutor[size]; } /** *开始启动 */ public void start() { //防止存在未关闭窗口 如果有先关闭 stop(); for (int i = 0; i < mTaskExecutors.length; i++) { mTaskExecutors[i] = new TaskExecutor(taskQueue); mTaskExecutors[i].start(); } } /** * 关闭执行体 */ public void stop() { if (mTaskExecutors != null) { for (TaskExecutor taskExecutor : mTaskExecutors) { if (taskExecutor != null) { taskExecutor.quit(); } } } } /** * 允许执行体添加进来 * * @param task 目标任务 * @return 排队人数 */ public <T extends ITask> int add(T task) { if (!taskQueue.contains((task))) { taskQueue.add(task); } //当前排队的执行体数 return taskQueue.size(); } public int getTaskQueueSize(){ return taskQueue.size(); } }
-
如何调用
@Service @RequiredArgsConstructor public class xxxServiceImpl implements IxxxService { @Value("${project.MAX_ALLOW_TEXT_LENGTH}") private int MAX_ALLOW_TEXT_LENGTH; //最大允许的句子长度 @Value("${project.JIRA_ID}") private String JIRA_ID;//请求模型的jiraID参数 @Value("${project.MODEL_URL}") private String MODEL_URL; //模型地址 //开启请求模型的队列窗口 RequestModelTaskQueue requestModelTaskQueue = new RequestModelTaskQueue(5); @Override @Transactional(rollbackFor = Exception.class) public List<XXXVO> checkContent(XXX xxx) { //开启请求任务窗口 requestModelTaskQueue.start(); //初始化任务 RequestModelTask task = new RequestModelTask(finalList, JIRA_ID, MODEL_URL); //将任务添加进队列里面 requestModelTaskQueue.add(task); //调用队列方法获取结果 List<XXXVO> voList = task.getCorrectByModel(); //xxx 一些其他的方法 }
上述就是任务队列的Java实现基本过程。