智能BI(后端)-- 系统异步化

news2025/1/22 16:01:28

文章目录

  • 系统问题分析
  • 什么是异步化?
  • 业务流程分析
    • 标准异步化的业务流程
    • 系统业务流程
  • 线程池
    • 为什么需要线程池?
    • 线程池两种实现方式
    • 线程池的参数
    • 线程池的开发
  • 项目异步化改造

系统问题分析

问题场景:调用的服务能力有限,或者接口的处理(或返回)时长较长时,就应该考虑异步化了

什么是异步化?

不用等一件事做完,就可以做另外一件事,等第一件事完成时,可以收到一个通知

业务流程分析

标准异步化的业务流程

  1. 当用户要进行耗时很长的操作时,点击提交后,不需要在界面空等,而是应该把这个任务保存到数据库中记录下来
  2. 用户要执行新任务时:
    a. 任务提交成功:
    ⅰ. 若程序存在空闲线程,可以立即执行此任务
    ⅱ. 若所有线程均繁忙,任务将入队列等待处理
    b. 任务提交失败:比如所有线程都在忙碌且任务队列满了
    ⅰ.选择拒绝此任务,不再执行
    ⅱ.通过查阅数据库记录,发现提交失败的任务,并在程序空闲时将这些任务取出执行
  3. 程序(线程)从任务队列中取出任务依次执行,每完成一项任务,就更新任务状态。
  4. 用户可以查询任务的执行状态,或者在任务执行成功或失败时接收通知(例如:发邮件、系统消息提示或短信),从而优化体验
  5. 对于复杂且包含多个环节的任务,在每个小任务完成时,要在程序(数据库中))记录任务的执行状态(进度)。

系统业务流程

  1. 用户点击智能分析页提交按钮时,先把图表立刻保存到数据库中(作为一个任务)
  2. 用户可以在图表管理查看所有图表(已生成的,生成中的,生成失败的)的信息和状态
  3. 用户可以修改生成失败的图表信息,点击重新生成,以尝试再次创建图表
    在这里插入图片描述

问题分析?

  1. 任务队列的最大容量应该设置为多少
  2. 程序怎么从任务队列中取出任务去执行?这个任务队列的流程怎么实现?怎么保证程序最多同时执行多少个任务?

线程池实现

线程池

为什么需要线程池?

  1. 线程的管理比较复杂
  2. 任务存取比较复杂
  3. 线程池可以帮你轻松管理线程,协调任务的执行过程

线程池两种实现方式

  1. Spring中,可以用ThreadPoolTaskExrcutor配合@Async注解来实现(不推荐)
  2. 在Java中,可以使用JUC并发编程包中的ThreadPoolExecutor来实现非常灵活地自定义线程池

线程池的参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {

现状:AI生成能力的并发是只允许4个任务同时去执行,AI能力允许20个任务排队
corePoolSize(核心线程数):正常情况下,我们的系统应该能同时工作的线程数
maximumPoolSize(最大线程数):极限情况下,我们的线程池所拥有的线程
keepAliveTime(空闲线程存活时间):非核心线程在没有任务的情况下,过多久要删除,从而释放无用的线程资源
unit(空闲线程存活时间的单位):分钟,秒
workQueue(工作队列):用于存放给线程执行的任务,存在一个队列的长度(一定要设置)
threadFactory(线程工厂):控制每个线程的生成,线程的属性
RejectedExecutionHandler(拒绝策略):任务队列满的时候,我们采取什么措施

线程池的开发

自定义线程池配置

package com.yupi.springbootinit.config;

import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
public class ThreadPoolExecutorConfig {
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(){
        // 创建一个线程工厂
        ThreadFactory threadFactory = new ThreadFactory(){
            // 初始化线程数为 1
            private int count = 1;
            // 创建一个新的线程
            @Override
            // 每当线程池需要创建新线程时,就会调用newThread方法
            // @NotNull Runnable r 表示方法参数 r 应该永远不为null,
            public Thread newThread(@NotNull Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("线程" + count ++);
                return thread;
            }
        };
        // 创建一个新的线程池,线程池核心大小为2,最大线程数为4,
        // 非核心线程空闲时间为100秒,任务队列为阻塞队列,长度为4,使用自定义的线程工厂创建线
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,4,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(4),threadFactory);
        return threadPoolExecutor;
    }
}

测试controller层(注意线上环境不要暴露出去)

package com.yupi.springbootinit.controller;

import cn.hutool.json.JSONUtil;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Profile;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 队列测试controller
 */
@RestController
@RequestMapping("/queue")
@Slf4j
@Profile({"dev","local"}) // 只在开发环境和本地环境生效
public class QueueController {


    @Resource
    private ThreadPoolExecutor threadPoolExecutor;

    @GetMapping("/add")
    // 接收一个参数name,然后将任务添加到线程池中
    public void add(String name){
    // 使用CompletableFuture运行一个异步任务
        CompletableFuture.runAsync(()->{

            log.info("任务执行中:" + name + "执行人:" + Thread.currentThread().getName());
            try {
                // 让线程休眠10分钟,模拟长时间运行的任务
                Thread.sleep(600000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
             异步任务在threadPoolExecutor中执行
        },threadPoolExecutor);
    }


    @GetMapping("/get")
    public String  get(){
        Map<String, Object> map = new HashMap<>();
        int size = threadPoolExecutor.getQueue().size();
        map.put("队列长度",size);
        long taskCount = threadPoolExecutor.getTaskCount();
        map.put("任务总数",taskCount);
        long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
        map.put("已完成的任务总数",completedTaskCount);
        int activeCount = threadPoolExecutor.getActiveCount();
        map.put("正在工作的线程数",activeCount);
        return JSONUtil.toJsonStr(map);
    }
}

项目异步化改造

/**
 * 智能分析(异步)
 *
 * @param multipartFile
 * @param genChartByAiRequest
 * @param request
 * @return
 */
@PostMapping("/gen/async")
public BaseResponse<BiResponse> genChartByAiAsync(@RequestPart("file") MultipartFile multipartFile,
                                                  GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) {
    String name = genChartByAiRequest.getName();
    String goal = genChartByAiRequest.getGoal();
    String chartType = genChartByAiRequest.getChartType();

    // 校验
    ThrowUtils.throwIf(StringUtils.isBlank(goal), ErrorCode.PARAMS_ERROR, "目标为空");
    ThrowUtils.throwIf(StringUtils.isNotBlank(name) && name.length() > 100, ErrorCode.PARAMS_ERROR, "名称过长");

    // 校验文件
    long size = multipartFile.getSize();
    String originalFilename = multipartFile.getOriginalFilename();

    // 校验文件大小
    final long ONE_MB = 1024 * 1024L;
    ThrowUtils.throwIf(size > ONE_MB, ErrorCode.PARAMS_ERROR, "文件超过 1M");

    // 校验文件大小缀 aaa.png
    String suffix = FileUtil.getSuffix(originalFilename);
    final List<String> validFileSuffixList = Arrays.asList("xlsx", "xls");
    ThrowUtils.throwIf(!validFileSuffixList.contains(suffix), ErrorCode.PARAMS_ERROR, "文件后缀非法");

    User loginUser = userService.getLoginUser(request);

    // 限流判断,每个用户一个限流器
    redisLimiterManager.doRateLimit("genChartByAi_" + loginUser.getId());

    // 指定一个模型id(把id写死,也可以定义成一个常量)
    long biModelId = 1659171950288818178L;
    // 分析需求:
    // 分析网站用户的增长情况
    // 原始数据:
    // 日期,用户数
    // 1号,10
    // 2号,20
    // 3号,30

    // 构造用户输入
    StringBuilder userInput = new StringBuilder();
    userInput.append("分析需求:").append("\n");

    // 拼接分析目标
    String userGoal = goal;
    if (StringUtils.isNotBlank(chartType)) {
        userGoal += ",请使用" + chartType;
    }
    userInput.append(userGoal).append("\n");
    userInput.append("原始数据:").append("\n");
    // 压缩后的数据
    String csvData = ExcelUtils.excelToCsv(multipartFile);
    userInput.append(csvData).append("\n");

    // 先把图表保存到数据库中
    Chart chart = new Chart();
    chart.setName(name);
    chart.setGoal(goal);
    chart.setChartData(csvData);
    chart.setChartType(chartType);
    // 插入数据库时,还没生成结束,把生成结果都去掉
//        chart.setGenChart(genChart);
//        chart.setGenResult(genResult);
    // 设置任务状态为排队中
    chart.setStatus("wait");
    chart.setUserId(loginUser.getId());
    boolean saveResult = chartService.save(chart);
    ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "图表保存失败");

    // 在最终的返回结果前提交一个任务
    // todo 建议处理任务队列满了后,抛异常的情况(因为提交任务报错了,前端会返回异常)
    CompletableFuture.runAsync(() -> {
        // 先修改图表任务状态为 “执行中”。等执行成功后,修改为 “已完成”、保存执行结果;执行失败后,状态修改为 “失败”,记录任务失败信息。(为了防止同一个任务被多次执行)
        Chart updateChart = new Chart();
        updateChart.setId(chart.getId());
        // 把任务状态改为执行中
        updateChart.setStatus("running");
        boolean b = chartService.updateById(updateChart);
        // 如果提交失败(一般情况下,更新失败可能意味着你的数据库出问题了)
        if (!b) {
            handleChartUpdateError(chart.getId(), "更新图表执行中状态失败");
            return;
        }

        // 调用 AI
        String result = aiManager.doChat(biModelId, userInput.toString());
        String[] splits = result.split("【【【【【");
        if (splits.length < 3) {
            handleChartUpdateError(chart.getId(), "AI 生成错误");
            return;
        }
        String genChart = splits[1].trim();
        String genResult = splits[2].trim();
        // 调用AI得到结果之后,再更新一次
        Chart updateChartResult = new Chart();
        updateChartResult.setId(chart.getId());
        updateChartResult.setGenChart(genChart);
        updateChartResult.setGenResult(genResult);
        updateChartResult.setStatus("succeed");
        boolean updateResult = chartService.updateById(updateChartResult);
        if (!updateResult) {
            handleChartUpdateError(chart.getId(), "更新图表成功状态失败");
        }
    },threadPoolExecutor);

    BiResponse biResponse = new BiResponse();
//        biResponse.setGenChart(genChart);
//        biResponse.setGenResult(genResult);
    biResponse.setChartId(chart.getId());
    return ResultUtils.success(biResponse);
}
// 上面的接口很多用到异常,直接定义一个工具类
private void handleChartUpdateError(long chartId, String execMessage) {
    Chart updateChartResult = new Chart();
    updateChartResult.setId(chartId);
    updateChartResult.setStatus("failed");
    updateChartResult.setExecMessage(execMessage);
    boolean updateResult = chartService.updateById(updateChartResult);
    if (!updateResult) {
        log.error("更新图表失败状态失败" + chartId + "," + execMessage);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1658444.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

phpstudy(MySQL启动又立马停止)问题的解决办法

方法一&#xff1a;查看本地安装的MySQL有没有启动 1.鼠标右击开始按钮选择计算机管理 2.点击服务和应用程序 3.找到服务双击 4.找到MySQL服务 5.双击查看是否启动&#xff0c;如启动则停止他&#xff0c;然后确定&#xff0c;重新打开phpstudy,启动Mysql. 方法二&#xff…

OpenHarmony 实战开发——3.1 Release + Linux 原厂内核Launcher起不来问题分析报告

1、关键字 Launcher 无法启动&#xff1b;原厂内核&#xff1b;Access Token ID&#xff1b; 2、问题描述 芯片&#xff1a;rk3566&#xff1b;rk3399 内核版本&#xff1a;Linux 4.19&#xff0c;是 RK 芯片原厂发布的 rk356x 4.19 稳定版内核 OH 版本&#xff1a;OpenHa…

net7部署经历

1、linux安装dotnet命令&#xff1a; sudo yum install dotnet-sdk-7.0 或者直接在商店里安装 2、配置反向代理 127.0.0.1:5000》localhost 访问后报错 原因&#xff1a;数据表驼峰名&#xff0c; 在windows的数据表不区分大小写&#xff0c;但是在linux里面是默认区分的&…

xiuno(修罗)知乎模板二开优化魔板仿网盘资源社–模板加全套插件

使用说明 以服务器为例搭建教程 ①先安装 PHP7.1 版本 再安装数据库 Mysql ②解压文件&#xff1a;xiunobbs_4.0.4&#xff08;解压到根目录&#xff09;.zip ③解压②完成后找到【plugin】文件夹再解压&#xff1a;plugin(解压到 plugin 文件夹).zip 设置伪静态代码在上面&am…

记录如何查询域名txt解析是否生效

要查询域名的TXT记录&#xff0c;可以使用nslookup命令。具体步骤如下&#xff1a;12 打开命令行终端。输入命令 nslookup -qttxt 域名&#xff0c;将"域名"替换为你要查询的实际域名。执行命令后&#xff0c;nslookup会返回域名的TXT记录值。 如何查询域名txt解析是…

【C++后端项目】负载均衡OJ服务器

文章目录 一、演示项目二、所用技术与开发环境所用技术开发环境 三、项目宏观结构I. 风格&#xff1a;仿leetcodeII. 结构&#xff1a;Browser-Server模式III. 编写思路&#xff1a;编译服务 -> OJ服务 -> 前端设计 四、关于Git分支管理✨4.1 Git 分支结构4.2 Git 分支命…

【linux】主分区,扩展分区,逻辑分区,动态分区,引导分区,标准分区

目录 主分区&#xff0c;扩展分区&#xff0c;逻辑分区 主分区和引导分区 主分区&#xff0c;扩展分区&#xff0c;逻辑分区&#xff08;标准分区&#xff09; 硬盘一般划分为一个“主分区”和“扩展分区”&#xff0c;然后在扩展分区上再分成数个逻辑分区。 磁盘主分区扩展…

调用 gradio 创建聊天网页报错(使用远程服务器)

文章目录 写在前面1、使用默认IP地址&#xff08;失败&#xff09;2、使用本地IP地址&#xff08;失败&#xff09;3、使用远程服务器IP地址&#xff08;成功&#xff09; 写在前面 我复现了github上的 llama-chinese 的工作 使用的是 llama2&#xff0c;环境配置是在远程服务…

如何使用 ArcGIS Pro 计算容积率

容积率是指地上建筑物的总面积与用地面积的比率&#xff0c;数值越小越舒适&#xff0c;这里为大家介绍一下如何使用ArcGIS Pro 计算容积率&#xff0c;希望能对你有所帮助。 数据来源 教程所使用的数据是从水经微图中下载的建筑和小区数据&#xff0c;除了建筑和小区数据&am…

智能合约是什么?搭建与解析

智能合约是一种基于区块链技术的自动化执行合约&#xff0c;它通过编程语言编写&#xff0c;并在区块链网络上部署运行。智能合约是区块链技术的重要组成部分&#xff0c;它使得去中心化应用&#xff08;DApp&#xff09;的开发变得更加便捷和高效。本文将从智能合约的搭建、原…

如何解决 NPM依赖下载超时问题 :npm ERR! network timeout at: https://registry.npmjs.org/猫头虎

如何解决 NPM依赖下载超时问题 &#xff1a;npm ERR! network timeout at: https://registry.npmjs.org/猫头虎 博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试…

正交频分复用回顾(通俗易懂)

OFDM我们知道&#xff0c;叫做正交频分复用&#xff0c;它是4G的一个关键技术&#xff0c;4G的多址技术叫做OFDMA&#xff0c;也就是说4G是通过OFDM来作用户区分的&#xff0c;具体是什么意思呢&#xff1f;继续往下看。 图1 在2G和3G时代&#xff0c; 单用户都是用的一个载波…

Golang——Strconv包

func ParseBool(str string) (value bool, err error) strconv包实现了基本数据类型与其字符串表示的转换&#xff0c;主要有以下常用函数&#xff1a;Atoi()&#xff0c;Itoa()&#xff0c;parse系列函数&#xff0c;format系列函数&#xff0c;append系列函数。 1.1 string与…

休斯《公共管理导论》第4版教材精讲视频网课+考研真题讲解

内容简介 本课程是休斯《公共管理导论》&#xff08;第4版&#xff09;精讲班&#xff0c;为了帮助参加研究生招生考试指定考研参考书目为休斯《公共管理导论》&#xff08;第4版&#xff09;的考生复习专业课&#xff0c;我们根据教材和名校考研真题的命题规律精心讲解教材章节…

防爆手机在石油化工行业中的作用是什么?

在石油化工行业这一高风险领域中&#xff0c;安全始终被置于首要位置。而在这样的环境中&#xff0c;通信设备的选择尤为关键。防爆手机&#xff0c;作为专为危险环境设计的通信设备&#xff0c;其在石油化工行业中的作用不容忽视。 它不仅能在易燃易爆的复杂环境中稳定运行&am…

攻防世界PHP2

1、打开靶机链接http://61.147.171.105:49513/&#xff0c;没有发现任何线索 2、尝试访问http://61.147.171.105:49513/index.php&#xff0c;页面没有发生跳转 3、尝试将访问 尝试访问http://61.147.171.105:49513/index.phps index.php 和 index.phps 文件之间的主要区别在于…

libcity 笔记:添加自定义dataset

假设我们把libcity/data/dataset/trajectory_dataset.py复制一份到libcity/data/dataset/dataset_subclass/GeolifeDM_dataset.py&#xff0c;里面内容不变&#xff0c;只是把class的名字换了 那其他需要修改哪些内容&#xff0c;使得这个dataset生效呢 libcity/data/dataset/d…

【Vue】vue中将 html 或者 md 导出为 word 文档

原博主 xh-htmlword文档 感谢这位大佬的封装优化和分享&#xff0c;亲测有用&#xff01;可以去看大佬&#x1f447;的说明&#xff01; 前端HTML转word文档&#xff0c;绝对有效&#xff01;&#xff01;&#xff01; 安装 npm install xh-htmlword导入 import handleEx…

uniapp开发的小程序toast被键盘遮挡提示内容无法完全显示问题解决

文章目录 问题描述问题解决参考链接&#xff1a; 问题描述 在开发抖音小程序后&#xff0c;当用户提交反馈后&#xff0c;调用了系统的toast来显示是否提交成功&#xff0c;结果被系统的键盘给盖住&#xff0c;无法显示完全。 即&#xff0c;简单来说&#xff1a;Toast会被弹…

DI/DO/AI/AO混合分布式BACnet IO控制器助力智慧城市

智慧城市建设浪潮中&#xff0c;钡铼电子的BL207 BACnet边缘计算远程I/O控制器正以其独特的技术优势&#xff0c;成为推动城市智能化转型的关键力量。智慧城市不仅仅是概念上的创新&#xff0c;它需要坚实的技术支撑来实现资源的高效利用、环境的可持续发展以及居民生活的便捷与…