用spring-webmvc包实现AI(Deepseek)事件流(SSE)推送

news2025/4/6 23:17:05

 前后端:  Spring Boot + Angular

spring-webmvc-5.2.2包

代码片段如下:

控制层:

@GetMapping(value = "/realtime/page/ai/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @ApiOperation(value = "获取告警记录进行AI分析")
    public SseEmitter getRealTimeAlarmAi(AlarmRecordQueryParam queryParam) {
        final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
        IPage<AlarmRecord> page = alarmRecordService.findRealTimeByParam(queryParam);
        StringBuilder alarmInfo = new StringBuilder();
        try {
            // 根据状态设置前缀
            String prefix = queryParam.getStatus() == 1 ?
                    "最近十条历史告警记录:" : "当前十条实时告警信息:";
            String emptyMessage = queryParam.getStatus() == 1 ?
                    "暂无历史告警" : "当前无实时告警";
            if (page.getRecords() != null && !page.getRecords().isEmpty()) {
                alarmInfo.append(prefix);
                sseService.buildAlarmContent(page, alarmInfo, timeFormatter);
            } else {
                alarmInfo.append(emptyMessage);
            }
            sseService.validatePromptLength(alarmInfo, maxPromptLength);
        } catch (Exception e) {
            log.error("告警信息处理异常", e);
        }
        return sseService.createStreamConnection(alarmInfo.toString(), "告警");
    }
    @ApiOperation("查询图表数据用AI分析数据详情")
    @GetMapping("/chart/ai/sse")
    @OpLog(
            inputExpression = "开始时间:{#queryParam.startTime},结束时间:{#queryParam.endTime},图表ID:{#queryParam.chartId}",
            outputExpression = "{#code}"
    )
    public SseEmitter chartAiSSEData(@Validated ChartDataQueryParam queryParam) throws Exception {
        String ChartAi = "报表";
        ChartInstance chart = Optional.ofNullable(chartService.getById(queryParam.getChartId()))
                .orElseThrow(() -> new Exception("找不到:" + queryParam.getChartId() + "的图表定义"));
        List<ChartDeviceSensor> deviceSensors = ChartInstance.toChartDeviceSensorList(chart);
        String endTime = DataQueryParam.endTime(queryParam.getEndTime());
        DataQueryParam dataQueryParam = new DataQueryParam(queryParam.getStartTime(), endTime, deviceSensors);
        IChartDataService chartDataService = chartDataServiceManager.getInstance(chart.getChartTypeId());
        List dataList = chartDataService.getChartData(dataQueryParam);
        List<String> times = dataQueryParam.getDateType().getTimes(dataQueryParam.getStartTime(), dataQueryParam.getEndTime());
        ChartData chartData = new ChartData<>(chart.getId(), chart.getName(), chart.getChartFormat(), chart.getChartTypeId(), chart.getShowType(), chart.getCategoryId(), times, dataList);
        // 将 ChartData 转换为压缩字符串
        String csvData = ChartDataFormatter.formatChartData(chartData);
        log.info("当前请求字符长度:" + csvData.length());
        try {
            if (csvData.length() > maxPromptLength) {
                OpLogAspect.setCode(400); // 设置错误码
                throw new IllegalArgumentException("数据长度超过限制,最大允许长度:" + maxPromptLength);
            }
            OpLogAspect.setCode(200);
            return sseService.createStreamConnection(csvData,ChartAi);
        } catch (IllegalArgumentException e) {
            OpLogAspect.setCode(400); // 参数错误
            throw e;
        } catch (Exception e) {
            OpLogAspect.setCode(500); // 系统错误
            throw new RuntimeException("处理请求失败", e);
        }
    }

业务层代码:

package com.keydak.project.core.chart.ai.service.impl;

import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.keydak.project.core.alarm.data.vo.AlarmRecord;
import com.keydak.project.core.chart.ai.dto.KeydakAiConfigDTO;
import com.keydak.project.core.chart.ai.exception.BalanceException;
import com.keydak.project.core.chart.ai.service.SSEService;
import com.keydak.repository.core.enums.SystemGlobalConfigEnum;
import com.keydak.repository.core.service.ISystemGlobalConfigService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import com.fasterxml.jackson.core.type.TypeReference;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * AI服务实现
 *
 * @author xyt
 */
@Service
@Slf4j
public class SSEServiceImpl implements SSEService {

    @Autowired
    private ISystemGlobalConfigService systemGlobalConfigService;


    private final ObjectMapper objectMapper = new ObjectMapper();
    private RateLimiter rateLimiter;

    @PostConstruct
    public void init() {
        try {
            // 初始化限流器时动态获取配置
            KeydakAiConfigDTO initialConfig = getConfig();
            rateLimiter = new RateLimiter(initialConfig.getRateLimit());
        } catch (Exception e) {
            throw new RuntimeException("初始化失败,无法获取Keydak AI配置", e);
        }
    }

    // 线程池配置
    private static final int CORE_POOL_SIZE = 5; // 核心线程数
    private static final int MAX_POOL_SIZE = 8; // 最大线程数
    private static final long KEEP_ALIVE_TIME = 30;  // 线程空闲时间
    private static final int QUEUE_CAPACITY = 30; //队列
    private final ExecutorService executor = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(QUEUE_CAPACITY),
            new ThreadPoolExecutor.AbortPolicy() // 使用 AbortPolicy 直接拒绝任务而不执行
    );

    /**
     * 刷新限流器配置
     */
    @Override
    public synchronized void refreshRateLimiter(Integer rate) {
        try {
            if (rateLimiter == null) {
                rateLimiter = new RateLimiter(rate);
            } else {
                rateLimiter.updateRate(rate);
            }
            log.info("限流器已更新,新速率限制: {}", rate);
        } catch (Exception e) {
            log.error("刷新限流器配置失败", e);
        }
    }

    @PreDestroy
    public void destroy() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 获取Keydak AI配置信息。
     *
     * @return 返回Keydak AI配置信息的DTO对象
     */
    private KeydakAiConfigDTO getConfig() throws Exception {
        KeydakAiConfigDTO config = systemGlobalConfigService.getTag(
                SystemGlobalConfigEnum.KEYDAK_AI_CONFIG,
                KeydakAiConfigDTO.class
        );

        if (config == null) {
            throw new Exception("Keydak AI配置不存在");
        }

        return config;
    }


    @Override
    public SseEmitter createStreamConnection(String message, String aiType) {
        SseEmitter emitter = new SseEmitter(120_000L); // 2分钟超时

        try {
            KeydakAiConfigDTO config = getConfig();
            double balance = getBalance();
            log.info("当前余额: {} 元", balance);
            log.warn("当前可用令牌数: {}", rateLimiter.tokens.get());
            if (!rateLimiter.tryAcquire()) {
                log.warn("请求被限流 | 当前允许的QPS:{}", config.getRateLimit());
                handleRateLimitError(emitter);
                return emitter;
            }
        } catch (BalanceException e) {
            handleBalanceError(emitter, e.getMessage());
            return emitter;
        } catch (Exception e) {
            handleBalanceError(emitter, "系统错误: " + e.getMessage());
            return emitter;
        }

        // 保持原有事件监听
        emitter.onCompletion(() -> log.info("SSE连接完成"));
        emitter.onTimeout(() -> {
            log.warn("SSE连接超时");
            rateLimiter.refill(); // 超时请求返还令牌
        });
        emitter.onError(e -> log.error("SSE连接错误", e));

        // 保持原有线程池处理
        executor.execute(() -> {
            try {
                processSSEStream(message, aiType, emitter);
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });

        return emitter;
    }

    /**
     * 新增限流错误处理方法
     *
     * @param emitter 事件发射器
     * @throws IOException 如果发送失败
     */
    private void handleRateLimitError(SseEmitter emitter) {
        try {
            Map<String, Object> error = new LinkedHashMap<>();
            error.put("error", "rate_limit_exceeded");
            error.put("message", "请求过于频繁,请稍后再试");
            error.put("timestamp", System.currentTimeMillis());

            emitter.send(SseEmitter.event()
                    .data(objectMapper.writeValueAsString(error))
                    .name("rate-limit-error")
                    .reconnectTime(5000L));

            emitter.complete();
        } catch (IOException e) {
            log.error("发送限流错误失败", e);
        }
    }

    private void handleBalanceError(SseEmitter emitter, String errorMsg) {
        try {
            JSONObject error = new JSONObject();
            error.put("error", "balance_insufficient");
            error.put("message", errorMsg);
            emitter.send(SseEmitter.event()
                    .data(error.toJSONString())
                    .name("balance-error"));
            emitter.complete();
        } catch (Exception e) {
            log.error("发送余额错误信息失败", e);
        }
    }


    private void processSSEStream(String message, String aiType, SseEmitter emitter) throws Exception {
        HttpURLConnection connection = null;
        try {
            connection = createConnection();
            String jsonBody = buildRequestBody(message, aiType);
            log.info("发送AI请求数据: {}", jsonBody); // 记录请求体
            sendRequestData(connection, jsonBody);
            validateResponse(connection);
            try (InputStream inputStream = connection.getInputStream();
                 BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    if (Thread.currentThread().isInterrupted()) {
                        throw new InterruptedException("处理被中断");
                    }
                    if (line.startsWith("data: ")) {
                        String jsonData = line.substring(6).trim();
                        log.debug("AI响应数据: {}", jsonData);
                        if ("[DONE]".equals(jsonData)) {
                            log.info("收到流结束标记");
                            sendCompletionEvent(emitter);  // 发送完成事件
                            break;  // 结束循环
                        }
                        try {
                            processStreamData(emitter, jsonData);
                        } catch (Exception e) {
                            log.error("数据处理失败,终止连接", e);
                            emitter.completeWithError(e);
                            break;
                        }
                    }
                }
            }
        } catch (Exception e) {
            log.error("SSE处理发生异常", e);
            throw e;
        } finally {
            if (connection != null) connection.disconnect();
        }
    }

    private void processStreamData(SseEmitter emitter, String jsonData) throws Exception {
        try {
            Map<String, Object> apiResponse = objectMapper.readValue(
                    jsonData,
                    new TypeReference<Map<String, Object>>() {
                    }
            );

            List<Map<String, Object>> choices = (List<Map<String, Object>>) apiResponse.get("choices");
            if (choices == null || choices.isEmpty()) return;

            Map<String, Object> choice = choices.get(0);
            Map<String, Object> delta = (Map<String, Object>) choice.get("delta");

            Map<String, Object> chunk = new LinkedHashMap<>();
            chunk.put("timestamp", System.currentTimeMillis());
            chunk.put("messageId", UUID.randomUUID().toString());

            // 处理思考过程
            if (delta.containsKey("reasoning_content")) {
                String reasoning = (String) delta.get("reasoning_content");
                if (reasoning != null && !reasoning.trim().isEmpty()) {
                    chunk.put("type", "reasoning");
                    chunk.put("content", reasoning);
                    sendChunk(emitter, chunk);
                }
            }

            // 处理正式回答
            if (delta.containsKey("content")) {
                String content = (String) delta.get("content");
                if (content != null) {
                    chunk.put("type", "answer");
                    chunk.put("content", content);
                    sendChunk(emitter, chunk);
                }
            }

        } catch (JsonProcessingException e) {
            log.error("JSON解析失败 | 原始数据: {} | 错误: {}", jsonData, e.getMessage());
            throw new IOException("Failed to process stream data", e);
        } catch (ClassCastException e) {
            log.error("数据结构类型错误 | 原始数据: {} | 错误: {}", jsonData, e.getMessage());
            throw new IllegalStateException("Invalid data structure", e);
        } catch (Exception e) {
            log.error("处理数据块时发生未知错误 | 原始数据: {}", jsonData, e);
            throw e;
        }
    }

    private void sendChunk(SseEmitter emitter, Map<String, Object> chunk) throws IOException {
        String chunkJson = objectMapper.writeValueAsString(chunk);
        log.debug("发送数据块: {}", chunkJson);

        SseEmitter.SseEventBuilder event = SseEmitter.event()
                .data(chunkJson)
                .id(UUID.randomUUID().toString())
                .name("ai-message")
                .reconnectTime(5000L);

        emitter.send(event);
    }


    private void sendCompletionEvent(SseEmitter emitter) {
        try {
            Map<String, Object> completionEvent = new LinkedHashMap<>();
            completionEvent.put("event", "done");
            completionEvent.put("timestamp", System.currentTimeMillis());
            completionEvent.put("messageId", UUID.randomUUID().toString());

            String eventJson = objectMapper.writeValueAsString(completionEvent);

            emitter.send(SseEmitter.event()
                    .data(eventJson)
                    .id("COMPLETION_EVENT")
                    .name("stream-end")
                    .reconnectTime(0L));  // 停止重连

            log.info("已发送流结束事件");
        } catch (IOException e) {
            log.error("发送完成事件失败", e);
        } finally {
            emitter.complete();
            log.info("SSE连接已关闭");
        }
    }

    private HttpURLConnection createConnection() throws Exception {
        KeydakAiConfigDTO config = getConfig();
        HttpURLConnection connection = (HttpURLConnection) new URL(config.getUrl()).openConnection();
        connection.setRequestMethod("POST");
        connection.setDoOutput(true);
        connection.setRequestProperty("Content-Type", "application/json");
        connection.setRequestProperty("Authorization", "Bearer " + config.getKey());
        connection.setRequestProperty("Accept", "text/event-stream");
        connection.setConnectTimeout(30_000);
        connection.setReadTimeout(120_000);
        return connection;
    }

    private void sendRequestData(HttpURLConnection connection, String jsonBody) throws Exception {
        try (OutputStream os = connection.getOutputStream()) {
            os.write(jsonBody.getBytes(StandardCharsets.UTF_8));
            os.flush();
        }
    }

    private void validateResponse(HttpURLConnection connection) throws Exception {
        if (connection.getResponseCode() != 200) {
            String errorMsg = readErrorStream(connection);
            throw new RuntimeException("API请求失败: " + connection.getResponseCode() + " - " + errorMsg);
        }
    }

    private String readErrorStream(HttpURLConnection connection) throws IOException {
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(connection.getErrorStream(), StandardCharsets.UTF_8))) {
            StringBuilder response = new StringBuilder();
            String line;
            while ((line = reader.readLine()) != null) {
                response.append(line);
            }
            return response.toString();
        }
    }

    private String buildRequestBody(String userMessage, String aiType) throws IOException {
        KeydakAiConfigDTO config = null;
        try {
            config = getConfig();
        } catch (Exception e) {
            e.printStackTrace();
        }

        Map<String, Object> request = new HashMap<>();
        request.put("model", config.getModelType());
        request.put("stream", true);

        List<Map<String, String>> messages = new ArrayList<>();
        Map<String, String> message = new HashMap<>();
        message.put("role", "user");
        if ("报表".equals(aiType)) {
            //报表提问词
            message.put("content", buildPrompt(config.getPrompt(), userMessage));
        } else {
            //告警提问词
            message.put("content", buildPrompt(config.getPromptAlarm(), userMessage));
        }

        messages.add(message);

        request.put("messages", messages);

        return objectMapper.writeValueAsString(request);
    }

    private String buildPrompt(String basePrompt, String userMessage) {
        return String.format("%s\n%s\n", basePrompt, userMessage);
    }


    /**
     * 查询当前余额
     *
     * @return 当前余额
     * @throws IOException 如果请求失败
     */
    @Override
    @SneakyThrows
    public double getBalance() {
        HttpURLConnection connection = null;
        try {
            KeydakAiConfigDTO config = getConfig();
            URL url = new URL(config.getBalanceUrl());
            connection = (HttpURLConnection) url.openConnection();
            connection.setRequestMethod("GET");
            connection.setRequestProperty("Authorization", "Bearer " + config.getKey());
            connection.setConnectTimeout(5000);
            connection.setReadTimeout(5000);

            int responseCode = connection.getResponseCode();
            if (responseCode != 200) {
                String errorBody = readErrorStream(connection); // 复用已有的错误流读取方法
                throw new IOException("HTTP Error: " + responseCode + " - " + errorBody);
            }

            try (BufferedReader reader = new BufferedReader(
                    new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {
                StringBuilder response = new StringBuilder();
                String line;
                while ((line = reader.readLine()) != null) {
                    response.append(line);
                }

                JSONObject jsonObject = JSON.parseObject(response.toString());
                // 以下解析逻辑保持原样
                if (!jsonObject.containsKey("is_available") || !jsonObject.containsKey("balance_infos")) {
                    throw new IOException("Invalid balance response format");
                }
                JSONArray balanceInfos = jsonObject.getJSONArray("balance_infos");
                if (jsonObject.getBoolean("is_available") && balanceInfos != null && !balanceInfos.isEmpty()) {
                    JSONObject balanceInfo = balanceInfos.getJSONObject(0);
                    if (!balanceInfo.containsKey("total_balance")) {
                        throw new IOException("Missing total_balance field");
                    }
                    return balanceInfo.getDouble("total_balance");
                } else {
                    throw new IOException("Balance information is not available");
                }
            }
        } finally {
            if (connection != null) {
                connection.disconnect();
            }
        }
    }

    /**
     * 限流器实现
     **/
    private static class RateLimiter {
        private volatile int capacity;
        private final AtomicInteger tokens;
        private volatile long lastRefillTime;
        private final Object lock = new Object();

        RateLimiter(int rate) {
            this.capacity = rate;
            this.tokens = new AtomicInteger(rate);
            this.lastRefillTime = System.currentTimeMillis();
        }

        public void refill() {
            synchronized (lock) {
                long now = System.currentTimeMillis();
                long elapsed = now - lastRefillTime;
                if (elapsed >= 1000) {
                    tokens.set(capacity); // 直接重置为最大容量
                    lastRefillTime = now;
                }
            }
        }

        public boolean tryAcquire() {
            synchronized (lock) {
                refill();
                if (tokens.get() > 0) {
                    tokens.decrementAndGet();
                    return true;
                }
                return false;
            }
        }

        public void updateRate(int newRate) {
            synchronized (lock) {
                this.capacity = newRate;
                tokens.set(Math.min(tokens.get(), newRate));
                lastRefillTime = System.currentTimeMillis();
            }
        }
    }


    /**
     * 告警内容构建方法
     **/
    @Override
    public void buildAlarmContent(IPage<AlarmRecord> page,
                                  StringBuilder alarmInfo,
                                  DateTimeFormatter formatter) {
        page.getRecords().forEach(record -> {
            // 时间格式化(使用首次告警时间)
            String time = Optional.ofNullable(record.getFirstTime())
                    .map(t -> t.format(formatter))
                    .orElse("时间未知");
            // 设备名称空值处理
            String device = StringUtils.defaultString(record.getDeviceName(), "未知设备");
            // 状态/数值处理逻辑
            String state = resolveStateValue(record);
            // 告警描述处理
            String desc = StringUtils.defaultString(record.getContent(), "未知告警类型");
            // 按规范格式拼接
            alarmInfo.append(
                    String.format("%s %s %s %s;", time, device, state, desc)
            );
        });
    }

    /**
     * 状态值解析方法
     */
    private String resolveStateValue(AlarmRecord record) {
        if (record.getValue() != null) {
            return record.getValue().stripTrailingZeros().toPlainString();
        }
        return record.getStatus() != null ?
                (record.getStatus() ? "1" : "0") : "状态未知";
    }

    /**
     * 长度校验方法
     **/
    @Override
    public void validatePromptLength(StringBuilder content, int maxLength) {
        if (content.length() > maxLength) {
            throw new IllegalArgumentException("告警数据过长,请缩小查询范围");
        }
    }

}

前端代码:

<div class="modal-area">
    <form name="formNg" novalidate>
        <div class="modal-header">
            <h3 class="modal-title" style="color: #FFFFFF">
                AI分析
            </h3>
        </div>
        <div class="modal-body">
            <div class="form">
                <!-- 加载状态 - 修改为动态效果 -->
                <div ng-if="connectionStatus === 'connecting'" class="loading">
                    <div class="ai-thinking-container">
                        <span>AI思考中</span>
                        <div class="ai-typing-indicator">
                            <div class="typing-dot"></div>
                            <div class="typing-dot"></div>
                            <div class="typing-dot"></div>
                        </div>
                    </div>
                </div>

                <!-- 思考过程 -->
                <div class="thinking-panel" ng-if="thinkingContent">
                    <div class="thinking-header">
                        <i class="fa fa-brain"></i> 思考过程
                        <!-- 总用时显示(完成后保留) -->
                        <span ng-if="thinkingTime">({{thinkingTime}}秒)</span>
                    </div>
                    <div class="thinking-content"
                         ng-bind-html="thinkingContent"
                         scroll-to-bottom="thinkingContent">
                    </div>
                </div>

                <!-- 正式回答 -->
                <div class="answer-panel" ng-if="answerContent">
                    <div class="answer-header">
                        <i class="fa fa-comment"></i> 以下是AI的分析
                    </div>
                    <div class="answer-content"
                         ng-bind-html="answerContent"
                         scroll-to-bottom="answerContent">
                    </div>
                </div>

                <!-- 错误提示 -->
                <div ng-if="connectionStatus === 'error'" class="alert alert-danger">
                    <i class="fa fa-exclamation-triangle"></i> 连接异常,请尝试重新分析
                </div>
            </div>
        </div>
        <div class="modal-footer">
            <button ng-click="retry()"
                    class="btn btn-warning"
                    ng-disabled="connectionStatus === 'connecting'">
                <i class="fa fa-redo"></i> 重新分析
            </button>
            <button ng-click="cancel()" class="btn btn-danger">
                <i class="fa fa-times"></i> 关闭
            </button>
        </div>
    </form>
</div>

<style>

    .thinking-header span {
        margin-left: 5px;
        font-size: 0.9em;
        opacity: 0.8;
        color: #a0c4ff;
    }

    .modal-body {
        height: 6rem; /* 设置固定高度 */
        overflow-y: auto; /* 内容超出时显示滚动条 */
        padding: 15px;
        background: #1B448A; /* 背景改为蓝色 */
        border-radius: 4px;
        font-family: 'Consolas', monospace;
        color: #FFFFFF;
    }

    /* 思考过程样式 */
    .thinking-panel {
        margin-bottom: 20px;
        border-left: 3px solid #4a90e2;
        padding-left: 15px;
    }

    .thinking-header {
        color: #4a90e2;
        font-size: 16px;
        margin-bottom: 10px;
    }

    .thinking-content {
        background: rgba(255, 255, 255, 0.05);
        padding: 12px;
        border-radius: 4px;
        color: #e0e0e0;
        line-height: 1.6;
    }

    /* 正式回答样式 */
    .answer-panel {
        margin-top: 25px;
        border-top: 1px solid #00c85333;
        padding-top: 15px;
    }

    .answer-header {
        color: #00c853;
        font-size: 16px;
        margin-bottom: 10px;
    }

    .answer-content {
        background: rgba(255, 255, 255, 0.05);
        padding: 12px;
        border-radius: 4px;
        color: #ffffff;
        line-height: 1.6;
    }

    /* 图标样式 */
    .fa-brain {
        color: #4a90e2;
        margin-right: 8px;
    }

    .fa-comment {
        color: #00c853;
        margin-right: 8px;
    }

    /* 新的加载动画样式 */
    .loading {
        color: #FFF;
        text-align: left;
        padding: 15px;
        font-size: 16px;
    }

    .ai-thinking-container {
        display: flex;
        align-items: center;
        gap: 8px;
    }

    .ai-typing-indicator {
        display: flex;
        align-items: center;
        gap: 4px;
        height: 20px;
    }

    .typing-dot {
        width: 8px;
        height: 8px;
        background-color: #FFFFFF;
        border-radius: 50%;
        opacity: 0.4;
        animation: typing-animation 1.4s infinite ease-in-out;
    }

    .typing-dot:nth-child(1) {
        animation-delay: 0s;
    }

    .typing-dot:nth-child(2) {
        animation-delay: 0.2s;
    }

    .typing-dot:nth-child(3) {
        animation-delay: 0.4s;
    }

    @keyframes typing-animation {
        0%, 60%, 100% {
            transform: translateY(0);
            opacity: 0.4;
        }
        30% {
            transform: translateY(-5px);
            opacity: 1;
        }
    }
</style>
// 报表分析
UI.Controllers.controller("AiTipsCtrl", [
    "$scope", "$sce", "$uibModalInstance", "parent", "SSEService", "$timeout",
    function($scope, $sce, $uibModalInstance, parent, SSEService, $timeout) {
        // 状态管理
        $scope.connectionStatus = 'connecting'; // connecting | connected | error | completed
        $scope.thinkingContent = null;
        $scope.answerContent = null;
        $scope.thinkingTime = null; // 新增:思考时间变量
        $scope.startTime = null; // 新增:开始时间戳

        let eventSource = null;
        let thinkingBuffer = "";
        let answerBuffer = "";

        // 自动滚动指令
        $scope.scrollToBottom = function() {
            $timeout(() => {
                const container = document.querySelector('.modal-body');
                if (container) {
                    container.scrollTop = container.scrollHeight + 120;
                }
            }, 50);
        };

        // 内容更新方法
        function processChunkData(data) {
            if (data.type === 'reasoning') {
                // 如果是第一条思考内容,记录开始时间
                if (!thinkingBuffer && !$scope.startTime) {
                    $scope.startTime = new Date().getTime();
                }
                thinkingBuffer += data.content;
                $scope.thinkingContent = $sce.trustAsHtml(
                    thinkingBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;')
                );
                // 更新思考时间
                updateThinkingTime();
            }
            else if (data.type === 'answer') {
                answerBuffer += data.content;
                $scope.answerContent = $sce.trustAsHtml(
                    answerBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;')
                );
            }
            $scope.scrollToBottom();
        }

        function updateThinkingTime() {
            if ($scope.startTime) {
                const currentTime = new Date().getTime();
                $scope.thinkingTime = ((currentTime - $scope.startTime) / 1000).toFixed(1);
            }
        }


        // 初始化SSE连接
        function initSSE() {
            const url = '/data/chart/ai/sse?' + $.param(parent.queryParam);
            eventSource = new EventSource(url);

            eventSource.onopen = () => {
                $scope.$apply(() => {
                    $scope.connectionStatus = 'connected';
                });
            };

            // 处理消息事件
            eventSource.addEventListener('ai-message', e => {
                $scope.$apply(() => {
                    try {
                        const data = JSON.parse(e.data);
                        processChunkData(data);
                    } catch (err) {
                        console.error('消息解析错误:', err);
                        $scope.answerContent = $sce.trustAsHtml(
                            '<div class="text-danger">数据格式错误</div>'
                        );
                    }
                });
            });

            // 处理结束事件
            eventSource.addEventListener('stream-end', () => {
                $scope.$apply(() => {
                    $scope.connectionStatus = 'completed';
                    //最终更新一次思考时间
                    updateThinkingTime();
                    safeClose();
                });
            });

            // 错误处理
            eventSource.onerror = (err) => {
                $scope.$apply(() => {
                    console.error('SSE连接错误:', err);
                    $scope.connectionStatus = 'error';
                    safeClose();
                });
            };
        }

        // 安全关闭连接
        function safeClose() {
            if (eventSource) {
                eventSource.close();
                eventSource = null;
            }
        }

        // 重新尝试
        $scope.retry = () => {
            safeClose();
            thinkingBuffer = "";
            answerBuffer = "";
            $scope.thinkingContent = null;
            $scope.answerContent = null;
            $scope.thinkingTime = null; //重置思考时间
            $scope.startTime = null; //重置开始时间
            $scope.connectionStatus = 'connecting';
            initSSE();
        };

        // 关闭模态框
        $scope.cancel = () => {
            safeClose();
            $uibModalInstance.dismiss();
        };

        // 初始化
        initSSE();

        // 清理
        $scope.$on('$destroy', () => {
            safeClose();
        });
    }
]);
// 告警分析
UI.Controllers.controller("AiAlarmTipsCtrl", [
    "$scope", "$sce", "$uibModalInstance", "parent", "SSEService", "$timeout",
    function($scope, $sce, $uibModalInstance, parent, SSEService, $timeout) {
        // 状态管理
        $scope.connectionStatus = 'connecting'; // connecting | connected | error | completed
        $scope.thinkingContent = null;
        $scope.answerContent = null;
        $scope.thinkingTime = null; // 新增:思考时间变量
        $scope.startTime = null; // 新增:开始时间戳

        let eventSource = null;
        let thinkingBuffer = "";
        let answerBuffer = "";

        // 自动滚动指令
        $scope.scrollToBottom = function() {
            $timeout(() => {
                const container = document.querySelector('.modal-body');
                if (container) {
                    container.scrollTop = container.scrollHeight + 120;
                }
            }, 50);
        };

        // 内容更新方法
        function processChunkData(data) {
            if (data.type === 'reasoning') {
                // 如果是第一条思考内容,记录开始时间
                if (!thinkingBuffer && !$scope.startTime) {
                    $scope.startTime = new Date().getTime();
                }

                thinkingBuffer += data.content;
                $scope.thinkingContent = $sce.trustAsHtml(
                    thinkingBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;')
                );
                // 更新思考时间
                updateThinkingTime();
            }
            else if (data.type === 'answer') {
                answerBuffer += data.content;
                $scope.answerContent = $sce.trustAsHtml(
                    answerBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;')
                );
            }
            $scope.scrollToBottom();
        }

        // 更新思考时间
        function updateThinkingTime() {
            if ($scope.startTime) {
                const currentTime = new Date().getTime();
                $scope.thinkingTime = ((currentTime - $scope.startTime) / 1000).toFixed(1);
            }
        }

        // 初始化SSE连接
        function initSSE() {
            const url = '/alarm/record/realtime/page/ai/sse?' + $.param(parent.queryParam);
            eventSource = new EventSource(url);

            eventSource.onopen = () => {
                $scope.$apply(() => {
                    $scope.connectionStatus = 'connected';
                });
            };

            // 处理消息事件
            eventSource.addEventListener('ai-message', e => {
                $scope.$apply(() => {
                    try {
                        const data = JSON.parse(e.data);
                        processChunkData(data);
                    } catch (err) {
                        console.error('消息解析错误:', err);
                        $scope.answerContent = $sce.trustAsHtml(
                            '<div class="text-danger">数据格式错误</div>'
                        );
                    }
                });
            });

            // 处理结束事件
            eventSource.addEventListener('stream-end', () => {
                $scope.$apply(() => {
                    $scope.connectionStatus = 'completed';
                    // 最终更新一次思考时间
                    updateThinkingTime();
                    safeClose();
                });
            });

            // 错误处理
            eventSource.onerror = (err) => {
                $scope.$apply(() => {
                    console.error('SSE连接错误:', err);
                    $scope.connectionStatus = 'error';
                    safeClose();
                });
            };
        }

        // 安全关闭连接
        function safeClose() {
            if (eventSource) {
                eventSource.close();
                eventSource = null;
            }
        }

        // 重新尝试
        $scope.retry = () => {
            safeClose();
            thinkingBuffer = "";
            answerBuffer = "";
            $scope.thinkingContent = null;
            $scope.answerContent = null;
            $scope.thinkingTime = null; // 重置思考时间
            $scope.startTime = null; // 重置开始时间
            $scope.connectionStatus = 'connecting';
            initSSE();
        };

        // 关闭模态框
        $scope.cancel = () => {
            safeClose();
            $uibModalInstance.dismiss();
        };

        // 初始化
        initSSE();

        // 清理
        $scope.$on('$destroy', () => {
            safeClose();
        });
    }
]);

            showAiTips: function (resolve) {
                this.showDialog("Template/AiTips.html", "AiTipsCtrl", resolve, 600);
            },
            showAiAlarmTips: function (resolve) {
                this.showDialog("Template/AiAlarmTips.html", "AiAlarmTipsCtrl", resolve, 600);
            }


数据库结构:

INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'balanceUrl', 'https://api.deepseek.com/user/balance', '余额查询');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'enable', 'true', '启用AI报表助手');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'key', '', 'API密钥');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'modelType', 'deepseek-reasoner', 'deepseek模型类型');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'prompt', '', 'AI提问词');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'promptAlarm', '', 'AI提问词');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'rateLimit', '3', '限制每秒多少次请求');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'url', 'https://api.deepseek.com/v1/chat/completions', 'API接口');

实体类(使用AES加密 密钥):

package com.keydak.project.core.chart.ai.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.util.*;

/**
 * AI配置信息
 *
 * @author xyt
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class KeydakAiConfigDTO {

    private Boolean enable;
    /**
     * API_URL地址
     **/
    private String url;
    /**
     * 查询余额地址
     **/
    private String balanceUrl;
    /**
     * API密钥
     **/
    private String key;
    /**
     * 限流次数
     **/
    private Integer rateLimit;
    /**
     * AI提问词(报表)
     **/
    private String prompt;

    /**
     * AI提问词(告警)
     **/
    private String promptAlarm;

    /**
     * 模型类型
     **/
    private String modelType;


    private static final String SALT = ""; // 16 bytes for AES-128
    private static final String ALGORITHM = "AES/ECB/PKCS5Padding";

    public void validate() {
        List<String> missingFields = new ArrayList<>();
        if (url == null) missingFields.add("API_URL地址");
        if (balanceUrl == null) missingFields.add("查询余额地址");
        if (key == null) missingFields.add("API密钥");
        if (rateLimit == null) missingFields.add("限流次数");
        if (prompt == null) missingFields.add("AI提问词");

        if (!missingFields.isEmpty()) {
            throw new IllegalStateException("参数不能为空: " + String.join(", ", missingFields));
        }
    }

    /**
     * 判断密钥是否已经加密
     */
    public boolean isEncryptedKey(String key) {
        try {
            // 尝试解密,如果能成功解密则认为已经是加密过的
            decryptKey(key);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 加密密钥
     **/
    public String encryptKey(String key) throws Exception {
        SecretKeySpec secretKey = new SecretKeySpec(SALT.getBytes(StandardCharsets.UTF_8), "AES");
        Cipher cipher = Cipher.getInstance(ALGORITHM);
        cipher.init(Cipher.ENCRYPT_MODE, secretKey);
        byte[] encryptedKey = cipher.doFinal(key.getBytes(StandardCharsets.UTF_8));
        return Base64.getEncoder().encodeToString(encryptedKey);
    }

    /**
     * 解密密钥
     */
    public String decryptKey(String encryptedKey) throws Exception {
        SecretKeySpec secretKey = new SecretKeySpec(SALT.getBytes(StandardCharsets.UTF_8), "AES");
        Cipher cipher = Cipher.getInstance(ALGORITHM);
        cipher.init(Cipher.DECRYPT_MODE, secretKey);
        byte[] decryptedKey = cipher.doFinal(Base64.getDecoder().decode(encryptedKey));
        return new String(decryptedKey, StandardCharsets.UTF_8);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2328815.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络编程—Socket套接字(UDP)

上篇文章&#xff1a; 网络编程—网络概念https://blog.csdn.net/sniper_fandc/article/details/146923380?fromshareblogdetail&sharetypeblogdetail&sharerId146923380&sharereferPC&sharesourcesniper_fandc&sharefromfrom_link 目录 1 概念 2 Soc…

视频设备轨迹回放平台EasyCVR综合智能化,搭建运动场体育赛事直播方案

一、背景 随着5G技术的发展&#xff0c;体育赛事直播迎来了新的高峰。无论是NBA、西甲、英超、德甲、意甲、中超还是CBA等热门赛事&#xff0c;都是值得记录和回放的精彩瞬间。对于体育迷来说&#xff0c;选择观看的平台众多&#xff0c;但是作为运营者&#xff0c;搭建一套体…

AIGC实战——CycleGAN详解与实现

AIGC实战——CycleGAN详解与实现 0. 前言1. CycleGAN 基本原理2. CycleGAN 模型分析3. 实现 CycleGAN小结系列链接 0. 前言 CycleGAN 是一种用于图像转换的生成对抗网络(Generative Adversarial Network, GAN)&#xff0c;可以在不需要配对数据的情况下将一种风格的图像转换成…

VS2022远程调试Linux程序

一、 1、VS2022安装参考 VS Studio2022安装教程&#xff08;保姆级教程&#xff09;_visual studio 2022-CSDN博客 注意&#xff1a;勾选的时候&#xff0c;要勾选下方的选项&#xff0c;才能调试Linux环境下运行的程序&#xff01; 2、VS2022远程调试Linux程序测试 原文参…

345-java人事档案管理系统的设计与实现

345-java人事档案管理系统的设计与实现 项目概述 本项目为基于Java语言的人事档案管理系统&#xff0c;旨在帮助企事业单位高效管理员工档案信息&#xff0c;实现档案的电子化、自动化管理。系统涵盖了员工信息的录入、查询、修改、删除等功能&#xff0c;同时具备权限控制和…

【Linux系统编程】进程概念,进程状态

目录 一&#xff0c;操作系统&#xff08;Operator System&#xff09; 1-1概念 1-2设计操作系统的目的 1-3核心功能 1-4系统调用和库函数概念 二&#xff0c;进程&#xff08;Process&#xff09; 2-1进程概念与基本操作 2-2task_struct结构体内容 2-3查看进程 2-4通…

优选算法的妙思之流:分治——快排专题

专栏&#xff1a;算法的魔法世界 个人主页&#xff1a;手握风云 目录 一、快速排序 二、例题讲解 2.1. 颜色分类 2.2. 排序数组 2.3. 数组中的第K个最大元素 2.4. 库存管理 III 一、快速排序 分治&#xff0c;简单理解为“分而治之”&#xff0c;将一个大问题划分为若干个…

wx206基于ssm+vue+uniapp的优购电商小程序

开发语言&#xff1a;Java框架&#xff1a;ssmuniappJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;M…

React编程高级主题:错误处理(Error Handling)

文章目录 **5.2 错误处理&#xff08;Error Handling&#xff09;概述****5.2.1 onErrorReturn / onErrorResume&#xff08;错误回退&#xff09;****1. onErrorReturn&#xff1a;提供默认值****2. onErrorResume&#xff1a;切换备用数据流** **5.2.2 retry / retryWhen&…

ubuntu20.04升级成ubuntu22.04

命令行 sudo do-release-upgrade 我是按提示输入y确认操作&#xff0c;也可以遇到配置文件冲突时建议选择N保留当前配置

SpringCloud(25)——Stream介绍

1.场景描述 当我们的分布式系统建设到一定程度了&#xff0c;或者服务间是通过异步请求来通讯的&#xff0c;那么我们避免不了使用MQ来解决问题。 假如公司内部进行了业务合并或者整合&#xff0c;需要服务A和服务B通过MQ的方式进行消息传递&#xff0c;而服务A用的是RabbitMQ&…

centos8上实现lvs集群负载均衡dr模式

1.前言 个人备忘笔记&#xff0c;欢迎探讨。 centos8上实现lvs集群负载均衡nat模式 centos8上实现lvs集群负载均衡dr模式 之前写过一篇lvs-nat模式。实验起来相对顺利。dr模式最大特点是响应报文不经调度器&#xff0c;而是直接返回客户机。 dr模式分同网段和不同网段。同…

uniapp如何接入星火大模型

写在前面&#xff1a;最近的ai是真的火啊&#xff0c;琢磨了一下&#xff0c;弄个uniappx的版本开发个东西玩一下&#xff0c;想了想不知道放啥内容&#xff0c;突然觉得deepseek可以接入&#xff0c;好家伙&#xff0c;一对接以后发现这是个付费的玩意&#xff0c;我穷&#x…

MySQL vs MSSQL 对比

在企业数据库管理系统中&#xff0c;MySQL 和 Microsoft SQL Server&#xff08;MSSQL&#xff09;是最受欢迎的两大选择。MySQL 是一款开源的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;由 MySQL AB 开发&#xff0c;现归属于 Oracle 公司。而 MSSQL 是微…

python基础-10-组织文件

文章目录 【README】【10】组织文件&#xff08;复制移动删除重命名&#xff09;【10.1】shutil模块(shell工具)【10.1.1】复制文件和文件夹【10.1.1.1】复制文件夹及其下文件-shutil.copytree 【10.1.2】文件和文件夹的移动与重命名【10.1.3】永久删除文件和文件夹【10.1.4】用…

ORA-09925 No space left on device 问题处理全过程记录

本篇文章关键字&#xff1a;linux、oracle、审计、ORA-09925 一、故障现像 朋友找到我说是他们备份软件上报错。 问题比较明显&#xff0c;ORA-09925&#xff0c;看起来就是空间不足导致的 二、问题分析过程 这里说一下逐步的分析思路&#xff0c;有个意外提前说一下就是我…

多输入多输出 | Matlab实现BO-GRU贝叶斯优化门控循环单元多输入多输出预测

多输入多输出 | Matlab实现BO-GRU贝叶斯优化门控循环单元多输入多输出预测 目录 多输入多输出 | Matlab实现BO-GRU贝叶斯优化门控循环单元多输入多输出预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 Matlab实现BO-GRU贝叶斯优化门控循环单元多输入多输出预测&#…

27信号和槽_自定义信号(2)

自定义信号和槽 绑定信号和槽 如何才能触发出自定义的信号呢?&#xff08;上诉代码只是将信号和槽绑定在一起&#xff0c;但并没有触发信号&#xff09; Qt 内置的信号,都不需要咱们手动通过代码来触发 用户在 GUI, 进行某些操作,就会自动触发对应信号.(发射信号的代码已经内置…

人工智能在生物医药领域的应用地图:AIBC2025将于6月在上海召开!

人工智能在生物医药领域的应用地图&#xff1a;AIBC2025将于6月在上海召开&#xff01; 近年来&#xff0c;人工智能在生物医药行业中的应用受到广泛关注。 2024年10月&#xff0c;2024诺贝尔化学奖被授予“计算蛋白质设计和蛋白质结构预测”&#xff0c;这为行业从业人员带来…

2025.3.19

1、用vim编辑/etc/hosts文件&#xff0c;将本机和第二个虚拟机的ip地址和主机名写入该文件&#xff0c;然后ping 两个主机的主机名能否ping通&#xff1b; &#xff08;1&#xff09;在第一个虚拟机编辑/etc/hosts: 首先使用hostname、hostnamectl、hostname -f指令查看主机名…