前后端: Spring Boot + Angular
spring-webmvc-5.2.2包
代码片段如下:
控制层:
@GetMapping(value = "/realtime/page/ai/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ApiOperation(value = "获取告警记录进行AI分析")
public SseEmitter getRealTimeAlarmAi(AlarmRecordQueryParam queryParam) {
final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
IPage<AlarmRecord> page = alarmRecordService.findRealTimeByParam(queryParam);
StringBuilder alarmInfo = new StringBuilder();
try {
// 根据状态设置前缀
String prefix = queryParam.getStatus() == 1 ?
"最近十条历史告警记录:" : "当前十条实时告警信息:";
String emptyMessage = queryParam.getStatus() == 1 ?
"暂无历史告警" : "当前无实时告警";
if (page.getRecords() != null && !page.getRecords().isEmpty()) {
alarmInfo.append(prefix);
sseService.buildAlarmContent(page, alarmInfo, timeFormatter);
} else {
alarmInfo.append(emptyMessage);
}
sseService.validatePromptLength(alarmInfo, maxPromptLength);
} catch (Exception e) {
log.error("告警信息处理异常", e);
}
return sseService.createStreamConnection(alarmInfo.toString(), "告警");
}
@ApiOperation("查询图表数据用AI分析数据详情")
@GetMapping("/chart/ai/sse")
@OpLog(
inputExpression = "开始时间:{#queryParam.startTime},结束时间:{#queryParam.endTime},图表ID:{#queryParam.chartId}",
outputExpression = "{#code}"
)
public SseEmitter chartAiSSEData(@Validated ChartDataQueryParam queryParam) throws Exception {
String ChartAi = "报表";
ChartInstance chart = Optional.ofNullable(chartService.getById(queryParam.getChartId()))
.orElseThrow(() -> new Exception("找不到:" + queryParam.getChartId() + "的图表定义"));
List<ChartDeviceSensor> deviceSensors = ChartInstance.toChartDeviceSensorList(chart);
String endTime = DataQueryParam.endTime(queryParam.getEndTime());
DataQueryParam dataQueryParam = new DataQueryParam(queryParam.getStartTime(), endTime, deviceSensors);
IChartDataService chartDataService = chartDataServiceManager.getInstance(chart.getChartTypeId());
List dataList = chartDataService.getChartData(dataQueryParam);
List<String> times = dataQueryParam.getDateType().getTimes(dataQueryParam.getStartTime(), dataQueryParam.getEndTime());
ChartData chartData = new ChartData<>(chart.getId(), chart.getName(), chart.getChartFormat(), chart.getChartTypeId(), chart.getShowType(), chart.getCategoryId(), times, dataList);
// 将 ChartData 转换为压缩字符串
String csvData = ChartDataFormatter.formatChartData(chartData);
log.info("当前请求字符长度:" + csvData.length());
try {
if (csvData.length() > maxPromptLength) {
OpLogAspect.setCode(400); // 设置错误码
throw new IllegalArgumentException("数据长度超过限制,最大允许长度:" + maxPromptLength);
}
OpLogAspect.setCode(200);
return sseService.createStreamConnection(csvData,ChartAi);
} catch (IllegalArgumentException e) {
OpLogAspect.setCode(400); // 参数错误
throw e;
} catch (Exception e) {
OpLogAspect.setCode(500); // 系统错误
throw new RuntimeException("处理请求失败", e);
}
}
业务层代码:
package com.keydak.project.core.chart.ai.service.impl;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.keydak.project.core.alarm.data.vo.AlarmRecord;
import com.keydak.project.core.chart.ai.dto.KeydakAiConfigDTO;
import com.keydak.project.core.chart.ai.exception.BalanceException;
import com.keydak.project.core.chart.ai.service.SSEService;
import com.keydak.repository.core.enums.SystemGlobalConfigEnum;
import com.keydak.repository.core.service.ISystemGlobalConfigService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import com.fasterxml.jackson.core.type.TypeReference;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* AI服务实现
*
* @author xyt
*/
@Service
@Slf4j
public class SSEServiceImpl implements SSEService {
@Autowired
private ISystemGlobalConfigService systemGlobalConfigService;
private final ObjectMapper objectMapper = new ObjectMapper();
private RateLimiter rateLimiter;
@PostConstruct
public void init() {
try {
// 初始化限流器时动态获取配置
KeydakAiConfigDTO initialConfig = getConfig();
rateLimiter = new RateLimiter(initialConfig.getRateLimit());
} catch (Exception e) {
throw new RuntimeException("初始化失败,无法获取Keydak AI配置", e);
}
}
// 线程池配置
private static final int CORE_POOL_SIZE = 5; // 核心线程数
private static final int MAX_POOL_SIZE = 8; // 最大线程数
private static final long KEEP_ALIVE_TIME = 30; // 线程空闲时间
private static final int QUEUE_CAPACITY = 30; //队列
private final ExecutorService executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.AbortPolicy() // 使用 AbortPolicy 直接拒绝任务而不执行
);
/**
* 刷新限流器配置
*/
@Override
public synchronized void refreshRateLimiter(Integer rate) {
try {
if (rateLimiter == null) {
rateLimiter = new RateLimiter(rate);
} else {
rateLimiter.updateRate(rate);
}
log.info("限流器已更新,新速率限制: {}", rate);
} catch (Exception e) {
log.error("刷新限流器配置失败", e);
}
}
@PreDestroy
public void destroy() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* 获取Keydak AI配置信息。
*
* @return 返回Keydak AI配置信息的DTO对象
*/
private KeydakAiConfigDTO getConfig() throws Exception {
KeydakAiConfigDTO config = systemGlobalConfigService.getTag(
SystemGlobalConfigEnum.KEYDAK_AI_CONFIG,
KeydakAiConfigDTO.class
);
if (config == null) {
throw new Exception("Keydak AI配置不存在");
}
return config;
}
@Override
public SseEmitter createStreamConnection(String message, String aiType) {
SseEmitter emitter = new SseEmitter(120_000L); // 2分钟超时
try {
KeydakAiConfigDTO config = getConfig();
double balance = getBalance();
log.info("当前余额: {} 元", balance);
log.warn("当前可用令牌数: {}", rateLimiter.tokens.get());
if (!rateLimiter.tryAcquire()) {
log.warn("请求被限流 | 当前允许的QPS:{}", config.getRateLimit());
handleRateLimitError(emitter);
return emitter;
}
} catch (BalanceException e) {
handleBalanceError(emitter, e.getMessage());
return emitter;
} catch (Exception e) {
handleBalanceError(emitter, "系统错误: " + e.getMessage());
return emitter;
}
// 保持原有事件监听
emitter.onCompletion(() -> log.info("SSE连接完成"));
emitter.onTimeout(() -> {
log.warn("SSE连接超时");
rateLimiter.refill(); // 超时请求返还令牌
});
emitter.onError(e -> log.error("SSE连接错误", e));
// 保持原有线程池处理
executor.execute(() -> {
try {
processSSEStream(message, aiType, emitter);
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
/**
* 新增限流错误处理方法
*
* @param emitter 事件发射器
* @throws IOException 如果发送失败
*/
private void handleRateLimitError(SseEmitter emitter) {
try {
Map<String, Object> error = new LinkedHashMap<>();
error.put("error", "rate_limit_exceeded");
error.put("message", "请求过于频繁,请稍后再试");
error.put("timestamp", System.currentTimeMillis());
emitter.send(SseEmitter.event()
.data(objectMapper.writeValueAsString(error))
.name("rate-limit-error")
.reconnectTime(5000L));
emitter.complete();
} catch (IOException e) {
log.error("发送限流错误失败", e);
}
}
private void handleBalanceError(SseEmitter emitter, String errorMsg) {
try {
JSONObject error = new JSONObject();
error.put("error", "balance_insufficient");
error.put("message", errorMsg);
emitter.send(SseEmitter.event()
.data(error.toJSONString())
.name("balance-error"));
emitter.complete();
} catch (Exception e) {
log.error("发送余额错误信息失败", e);
}
}
private void processSSEStream(String message, String aiType, SseEmitter emitter) throws Exception {
HttpURLConnection connection = null;
try {
connection = createConnection();
String jsonBody = buildRequestBody(message, aiType);
log.info("发送AI请求数据: {}", jsonBody); // 记录请求体
sendRequestData(connection, jsonBody);
validateResponse(connection);
try (InputStream inputStream = connection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = reader.readLine()) != null) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("处理被中断");
}
if (line.startsWith("data: ")) {
String jsonData = line.substring(6).trim();
log.debug("AI响应数据: {}", jsonData);
if ("[DONE]".equals(jsonData)) {
log.info("收到流结束标记");
sendCompletionEvent(emitter); // 发送完成事件
break; // 结束循环
}
try {
processStreamData(emitter, jsonData);
} catch (Exception e) {
log.error("数据处理失败,终止连接", e);
emitter.completeWithError(e);
break;
}
}
}
}
} catch (Exception e) {
log.error("SSE处理发生异常", e);
throw e;
} finally {
if (connection != null) connection.disconnect();
}
}
private void processStreamData(SseEmitter emitter, String jsonData) throws Exception {
try {
Map<String, Object> apiResponse = objectMapper.readValue(
jsonData,
new TypeReference<Map<String, Object>>() {
}
);
List<Map<String, Object>> choices = (List<Map<String, Object>>) apiResponse.get("choices");
if (choices == null || choices.isEmpty()) return;
Map<String, Object> choice = choices.get(0);
Map<String, Object> delta = (Map<String, Object>) choice.get("delta");
Map<String, Object> chunk = new LinkedHashMap<>();
chunk.put("timestamp", System.currentTimeMillis());
chunk.put("messageId", UUID.randomUUID().toString());
// 处理思考过程
if (delta.containsKey("reasoning_content")) {
String reasoning = (String) delta.get("reasoning_content");
if (reasoning != null && !reasoning.trim().isEmpty()) {
chunk.put("type", "reasoning");
chunk.put("content", reasoning);
sendChunk(emitter, chunk);
}
}
// 处理正式回答
if (delta.containsKey("content")) {
String content = (String) delta.get("content");
if (content != null) {
chunk.put("type", "answer");
chunk.put("content", content);
sendChunk(emitter, chunk);
}
}
} catch (JsonProcessingException e) {
log.error("JSON解析失败 | 原始数据: {} | 错误: {}", jsonData, e.getMessage());
throw new IOException("Failed to process stream data", e);
} catch (ClassCastException e) {
log.error("数据结构类型错误 | 原始数据: {} | 错误: {}", jsonData, e.getMessage());
throw new IllegalStateException("Invalid data structure", e);
} catch (Exception e) {
log.error("处理数据块时发生未知错误 | 原始数据: {}", jsonData, e);
throw e;
}
}
private void sendChunk(SseEmitter emitter, Map<String, Object> chunk) throws IOException {
String chunkJson = objectMapper.writeValueAsString(chunk);
log.debug("发送数据块: {}", chunkJson);
SseEmitter.SseEventBuilder event = SseEmitter.event()
.data(chunkJson)
.id(UUID.randomUUID().toString())
.name("ai-message")
.reconnectTime(5000L);
emitter.send(event);
}
private void sendCompletionEvent(SseEmitter emitter) {
try {
Map<String, Object> completionEvent = new LinkedHashMap<>();
completionEvent.put("event", "done");
completionEvent.put("timestamp", System.currentTimeMillis());
completionEvent.put("messageId", UUID.randomUUID().toString());
String eventJson = objectMapper.writeValueAsString(completionEvent);
emitter.send(SseEmitter.event()
.data(eventJson)
.id("COMPLETION_EVENT")
.name("stream-end")
.reconnectTime(0L)); // 停止重连
log.info("已发送流结束事件");
} catch (IOException e) {
log.error("发送完成事件失败", e);
} finally {
emitter.complete();
log.info("SSE连接已关闭");
}
}
private HttpURLConnection createConnection() throws Exception {
KeydakAiConfigDTO config = getConfig();
HttpURLConnection connection = (HttpURLConnection) new URL(config.getUrl()).openConnection();
connection.setRequestMethod("POST");
connection.setDoOutput(true);
connection.setRequestProperty("Content-Type", "application/json");
connection.setRequestProperty("Authorization", "Bearer " + config.getKey());
connection.setRequestProperty("Accept", "text/event-stream");
connection.setConnectTimeout(30_000);
connection.setReadTimeout(120_000);
return connection;
}
private void sendRequestData(HttpURLConnection connection, String jsonBody) throws Exception {
try (OutputStream os = connection.getOutputStream()) {
os.write(jsonBody.getBytes(StandardCharsets.UTF_8));
os.flush();
}
}
private void validateResponse(HttpURLConnection connection) throws Exception {
if (connection.getResponseCode() != 200) {
String errorMsg = readErrorStream(connection);
throw new RuntimeException("API请求失败: " + connection.getResponseCode() + " - " + errorMsg);
}
}
private String readErrorStream(HttpURLConnection connection) throws IOException {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(connection.getErrorStream(), StandardCharsets.UTF_8))) {
StringBuilder response = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
response.append(line);
}
return response.toString();
}
}
private String buildRequestBody(String userMessage, String aiType) throws IOException {
KeydakAiConfigDTO config = null;
try {
config = getConfig();
} catch (Exception e) {
e.printStackTrace();
}
Map<String, Object> request = new HashMap<>();
request.put("model", config.getModelType());
request.put("stream", true);
List<Map<String, String>> messages = new ArrayList<>();
Map<String, String> message = new HashMap<>();
message.put("role", "user");
if ("报表".equals(aiType)) {
//报表提问词
message.put("content", buildPrompt(config.getPrompt(), userMessage));
} else {
//告警提问词
message.put("content", buildPrompt(config.getPromptAlarm(), userMessage));
}
messages.add(message);
request.put("messages", messages);
return objectMapper.writeValueAsString(request);
}
private String buildPrompt(String basePrompt, String userMessage) {
return String.format("%s\n%s\n", basePrompt, userMessage);
}
/**
* 查询当前余额
*
* @return 当前余额
* @throws IOException 如果请求失败
*/
@Override
@SneakyThrows
public double getBalance() {
HttpURLConnection connection = null;
try {
KeydakAiConfigDTO config = getConfig();
URL url = new URL(config.getBalanceUrl());
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.setRequestProperty("Authorization", "Bearer " + config.getKey());
connection.setConnectTimeout(5000);
connection.setReadTimeout(5000);
int responseCode = connection.getResponseCode();
if (responseCode != 200) {
String errorBody = readErrorStream(connection); // 复用已有的错误流读取方法
throw new IOException("HTTP Error: " + responseCode + " - " + errorBody);
}
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {
StringBuilder response = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
response.append(line);
}
JSONObject jsonObject = JSON.parseObject(response.toString());
// 以下解析逻辑保持原样
if (!jsonObject.containsKey("is_available") || !jsonObject.containsKey("balance_infos")) {
throw new IOException("Invalid balance response format");
}
JSONArray balanceInfos = jsonObject.getJSONArray("balance_infos");
if (jsonObject.getBoolean("is_available") && balanceInfos != null && !balanceInfos.isEmpty()) {
JSONObject balanceInfo = balanceInfos.getJSONObject(0);
if (!balanceInfo.containsKey("total_balance")) {
throw new IOException("Missing total_balance field");
}
return balanceInfo.getDouble("total_balance");
} else {
throw new IOException("Balance information is not available");
}
}
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
/**
* 限流器实现
**/
private static class RateLimiter {
private volatile int capacity;
private final AtomicInteger tokens;
private volatile long lastRefillTime;
private final Object lock = new Object();
RateLimiter(int rate) {
this.capacity = rate;
this.tokens = new AtomicInteger(rate);
this.lastRefillTime = System.currentTimeMillis();
}
public void refill() {
synchronized (lock) {
long now = System.currentTimeMillis();
long elapsed = now - lastRefillTime;
if (elapsed >= 1000) {
tokens.set(capacity); // 直接重置为最大容量
lastRefillTime = now;
}
}
}
public boolean tryAcquire() {
synchronized (lock) {
refill();
if (tokens.get() > 0) {
tokens.decrementAndGet();
return true;
}
return false;
}
}
public void updateRate(int newRate) {
synchronized (lock) {
this.capacity = newRate;
tokens.set(Math.min(tokens.get(), newRate));
lastRefillTime = System.currentTimeMillis();
}
}
}
/**
* 告警内容构建方法
**/
@Override
public void buildAlarmContent(IPage<AlarmRecord> page,
StringBuilder alarmInfo,
DateTimeFormatter formatter) {
page.getRecords().forEach(record -> {
// 时间格式化(使用首次告警时间)
String time = Optional.ofNullable(record.getFirstTime())
.map(t -> t.format(formatter))
.orElse("时间未知");
// 设备名称空值处理
String device = StringUtils.defaultString(record.getDeviceName(), "未知设备");
// 状态/数值处理逻辑
String state = resolveStateValue(record);
// 告警描述处理
String desc = StringUtils.defaultString(record.getContent(), "未知告警类型");
// 按规范格式拼接
alarmInfo.append(
String.format("%s %s %s %s;", time, device, state, desc)
);
});
}
/**
* 状态值解析方法
*/
private String resolveStateValue(AlarmRecord record) {
if (record.getValue() != null) {
return record.getValue().stripTrailingZeros().toPlainString();
}
return record.getStatus() != null ?
(record.getStatus() ? "1" : "0") : "状态未知";
}
/**
* 长度校验方法
**/
@Override
public void validatePromptLength(StringBuilder content, int maxLength) {
if (content.length() > maxLength) {
throw new IllegalArgumentException("告警数据过长,请缩小查询范围");
}
}
}
前端代码:
<div class="modal-area">
<form name="formNg" novalidate>
<div class="modal-header">
<h3 class="modal-title" style="color: #FFFFFF">
AI分析
</h3>
</div>
<div class="modal-body">
<div class="form">
<!-- 加载状态 - 修改为动态效果 -->
<div ng-if="connectionStatus === 'connecting'" class="loading">
<div class="ai-thinking-container">
<span>AI思考中</span>
<div class="ai-typing-indicator">
<div class="typing-dot"></div>
<div class="typing-dot"></div>
<div class="typing-dot"></div>
</div>
</div>
</div>
<!-- 思考过程 -->
<div class="thinking-panel" ng-if="thinkingContent">
<div class="thinking-header">
<i class="fa fa-brain"></i> 思考过程
<!-- 总用时显示(完成后保留) -->
<span ng-if="thinkingTime">({{thinkingTime}}秒)</span>
</div>
<div class="thinking-content"
ng-bind-html="thinkingContent"
scroll-to-bottom="thinkingContent">
</div>
</div>
<!-- 正式回答 -->
<div class="answer-panel" ng-if="answerContent">
<div class="answer-header">
<i class="fa fa-comment"></i> 以下是AI的分析
</div>
<div class="answer-content"
ng-bind-html="answerContent"
scroll-to-bottom="answerContent">
</div>
</div>
<!-- 错误提示 -->
<div ng-if="connectionStatus === 'error'" class="alert alert-danger">
<i class="fa fa-exclamation-triangle"></i> 连接异常,请尝试重新分析
</div>
</div>
</div>
<div class="modal-footer">
<button ng-click="retry()"
class="btn btn-warning"
ng-disabled="connectionStatus === 'connecting'">
<i class="fa fa-redo"></i> 重新分析
</button>
<button ng-click="cancel()" class="btn btn-danger">
<i class="fa fa-times"></i> 关闭
</button>
</div>
</form>
</div>
<style>
.thinking-header span {
margin-left: 5px;
font-size: 0.9em;
opacity: 0.8;
color: #a0c4ff;
}
.modal-body {
height: 6rem; /* 设置固定高度 */
overflow-y: auto; /* 内容超出时显示滚动条 */
padding: 15px;
background: #1B448A; /* 背景改为蓝色 */
border-radius: 4px;
font-family: 'Consolas', monospace;
color: #FFFFFF;
}
/* 思考过程样式 */
.thinking-panel {
margin-bottom: 20px;
border-left: 3px solid #4a90e2;
padding-left: 15px;
}
.thinking-header {
color: #4a90e2;
font-size: 16px;
margin-bottom: 10px;
}
.thinking-content {
background: rgba(255, 255, 255, 0.05);
padding: 12px;
border-radius: 4px;
color: #e0e0e0;
line-height: 1.6;
}
/* 正式回答样式 */
.answer-panel {
margin-top: 25px;
border-top: 1px solid #00c85333;
padding-top: 15px;
}
.answer-header {
color: #00c853;
font-size: 16px;
margin-bottom: 10px;
}
.answer-content {
background: rgba(255, 255, 255, 0.05);
padding: 12px;
border-radius: 4px;
color: #ffffff;
line-height: 1.6;
}
/* 图标样式 */
.fa-brain {
color: #4a90e2;
margin-right: 8px;
}
.fa-comment {
color: #00c853;
margin-right: 8px;
}
/* 新的加载动画样式 */
.loading {
color: #FFF;
text-align: left;
padding: 15px;
font-size: 16px;
}
.ai-thinking-container {
display: flex;
align-items: center;
gap: 8px;
}
.ai-typing-indicator {
display: flex;
align-items: center;
gap: 4px;
height: 20px;
}
.typing-dot {
width: 8px;
height: 8px;
background-color: #FFFFFF;
border-radius: 50%;
opacity: 0.4;
animation: typing-animation 1.4s infinite ease-in-out;
}
.typing-dot:nth-child(1) {
animation-delay: 0s;
}
.typing-dot:nth-child(2) {
animation-delay: 0.2s;
}
.typing-dot:nth-child(3) {
animation-delay: 0.4s;
}
@keyframes typing-animation {
0%, 60%, 100% {
transform: translateY(0);
opacity: 0.4;
}
30% {
transform: translateY(-5px);
opacity: 1;
}
}
</style>
// 报表分析
UI.Controllers.controller("AiTipsCtrl", [
"$scope", "$sce", "$uibModalInstance", "parent", "SSEService", "$timeout",
function($scope, $sce, $uibModalInstance, parent, SSEService, $timeout) {
// 状态管理
$scope.connectionStatus = 'connecting'; // connecting | connected | error | completed
$scope.thinkingContent = null;
$scope.answerContent = null;
$scope.thinkingTime = null; // 新增:思考时间变量
$scope.startTime = null; // 新增:开始时间戳
let eventSource = null;
let thinkingBuffer = "";
let answerBuffer = "";
// 自动滚动指令
$scope.scrollToBottom = function() {
$timeout(() => {
const container = document.querySelector('.modal-body');
if (container) {
container.scrollTop = container.scrollHeight + 120;
}
}, 50);
};
// 内容更新方法
function processChunkData(data) {
if (data.type === 'reasoning') {
// 如果是第一条思考内容,记录开始时间
if (!thinkingBuffer && !$scope.startTime) {
$scope.startTime = new Date().getTime();
}
thinkingBuffer += data.content;
$scope.thinkingContent = $sce.trustAsHtml(
thinkingBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' ')
);
// 更新思考时间
updateThinkingTime();
}
else if (data.type === 'answer') {
answerBuffer += data.content;
$scope.answerContent = $sce.trustAsHtml(
answerBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' ')
);
}
$scope.scrollToBottom();
}
function updateThinkingTime() {
if ($scope.startTime) {
const currentTime = new Date().getTime();
$scope.thinkingTime = ((currentTime - $scope.startTime) / 1000).toFixed(1);
}
}
// 初始化SSE连接
function initSSE() {
const url = '/data/chart/ai/sse?' + $.param(parent.queryParam);
eventSource = new EventSource(url);
eventSource.onopen = () => {
$scope.$apply(() => {
$scope.connectionStatus = 'connected';
});
};
// 处理消息事件
eventSource.addEventListener('ai-message', e => {
$scope.$apply(() => {
try {
const data = JSON.parse(e.data);
processChunkData(data);
} catch (err) {
console.error('消息解析错误:', err);
$scope.answerContent = $sce.trustAsHtml(
'<div class="text-danger">数据格式错误</div>'
);
}
});
});
// 处理结束事件
eventSource.addEventListener('stream-end', () => {
$scope.$apply(() => {
$scope.connectionStatus = 'completed';
//最终更新一次思考时间
updateThinkingTime();
safeClose();
});
});
// 错误处理
eventSource.onerror = (err) => {
$scope.$apply(() => {
console.error('SSE连接错误:', err);
$scope.connectionStatus = 'error';
safeClose();
});
};
}
// 安全关闭连接
function safeClose() {
if (eventSource) {
eventSource.close();
eventSource = null;
}
}
// 重新尝试
$scope.retry = () => {
safeClose();
thinkingBuffer = "";
answerBuffer = "";
$scope.thinkingContent = null;
$scope.answerContent = null;
$scope.thinkingTime = null; //重置思考时间
$scope.startTime = null; //重置开始时间
$scope.connectionStatus = 'connecting';
initSSE();
};
// 关闭模态框
$scope.cancel = () => {
safeClose();
$uibModalInstance.dismiss();
};
// 初始化
initSSE();
// 清理
$scope.$on('$destroy', () => {
safeClose();
});
}
]);
// 告警分析
UI.Controllers.controller("AiAlarmTipsCtrl", [
"$scope", "$sce", "$uibModalInstance", "parent", "SSEService", "$timeout",
function($scope, $sce, $uibModalInstance, parent, SSEService, $timeout) {
// 状态管理
$scope.connectionStatus = 'connecting'; // connecting | connected | error | completed
$scope.thinkingContent = null;
$scope.answerContent = null;
$scope.thinkingTime = null; // 新增:思考时间变量
$scope.startTime = null; // 新增:开始时间戳
let eventSource = null;
let thinkingBuffer = "";
let answerBuffer = "";
// 自动滚动指令
$scope.scrollToBottom = function() {
$timeout(() => {
const container = document.querySelector('.modal-body');
if (container) {
container.scrollTop = container.scrollHeight + 120;
}
}, 50);
};
// 内容更新方法
function processChunkData(data) {
if (data.type === 'reasoning') {
// 如果是第一条思考内容,记录开始时间
if (!thinkingBuffer && !$scope.startTime) {
$scope.startTime = new Date().getTime();
}
thinkingBuffer += data.content;
$scope.thinkingContent = $sce.trustAsHtml(
thinkingBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' ')
);
// 更新思考时间
updateThinkingTime();
}
else if (data.type === 'answer') {
answerBuffer += data.content;
$scope.answerContent = $sce.trustAsHtml(
answerBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' ')
);
}
$scope.scrollToBottom();
}
// 更新思考时间
function updateThinkingTime() {
if ($scope.startTime) {
const currentTime = new Date().getTime();
$scope.thinkingTime = ((currentTime - $scope.startTime) / 1000).toFixed(1);
}
}
// 初始化SSE连接
function initSSE() {
const url = '/alarm/record/realtime/page/ai/sse?' + $.param(parent.queryParam);
eventSource = new EventSource(url);
eventSource.onopen = () => {
$scope.$apply(() => {
$scope.connectionStatus = 'connected';
});
};
// 处理消息事件
eventSource.addEventListener('ai-message', e => {
$scope.$apply(() => {
try {
const data = JSON.parse(e.data);
processChunkData(data);
} catch (err) {
console.error('消息解析错误:', err);
$scope.answerContent = $sce.trustAsHtml(
'<div class="text-danger">数据格式错误</div>'
);
}
});
});
// 处理结束事件
eventSource.addEventListener('stream-end', () => {
$scope.$apply(() => {
$scope.connectionStatus = 'completed';
// 最终更新一次思考时间
updateThinkingTime();
safeClose();
});
});
// 错误处理
eventSource.onerror = (err) => {
$scope.$apply(() => {
console.error('SSE连接错误:', err);
$scope.connectionStatus = 'error';
safeClose();
});
};
}
// 安全关闭连接
function safeClose() {
if (eventSource) {
eventSource.close();
eventSource = null;
}
}
// 重新尝试
$scope.retry = () => {
safeClose();
thinkingBuffer = "";
answerBuffer = "";
$scope.thinkingContent = null;
$scope.answerContent = null;
$scope.thinkingTime = null; // 重置思考时间
$scope.startTime = null; // 重置开始时间
$scope.connectionStatus = 'connecting';
initSSE();
};
// 关闭模态框
$scope.cancel = () => {
safeClose();
$uibModalInstance.dismiss();
};
// 初始化
initSSE();
// 清理
$scope.$on('$destroy', () => {
safeClose();
});
}
]);
showAiTips: function (resolve) {
this.showDialog("Template/AiTips.html", "AiTipsCtrl", resolve, 600);
},
showAiAlarmTips: function (resolve) {
this.showDialog("Template/AiAlarmTips.html", "AiAlarmTipsCtrl", resolve, 600);
}
数据库结构:
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'balanceUrl', 'https://api.deepseek.com/user/balance', '余额查询');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'enable', 'true', '启用AI报表助手');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'key', '', 'API密钥');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'modelType', 'deepseek-reasoner', 'deepseek模型类型');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'prompt', '', 'AI提问词');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'promptAlarm', '', 'AI提问词');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'rateLimit', '3', '限制每秒多少次请求');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'url', 'https://api.deepseek.com/v1/chat/completions', 'API接口');
实体类(使用AES加密 密钥):
package com.keydak.project.core.chart.ai.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.util.*;
/**
* AI配置信息
*
* @author xyt
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class KeydakAiConfigDTO {
private Boolean enable;
/**
* API_URL地址
**/
private String url;
/**
* 查询余额地址
**/
private String balanceUrl;
/**
* API密钥
**/
private String key;
/**
* 限流次数
**/
private Integer rateLimit;
/**
* AI提问词(报表)
**/
private String prompt;
/**
* AI提问词(告警)
**/
private String promptAlarm;
/**
* 模型类型
**/
private String modelType;
private static final String SALT = ""; // 16 bytes for AES-128
private static final String ALGORITHM = "AES/ECB/PKCS5Padding";
public void validate() {
List<String> missingFields = new ArrayList<>();
if (url == null) missingFields.add("API_URL地址");
if (balanceUrl == null) missingFields.add("查询余额地址");
if (key == null) missingFields.add("API密钥");
if (rateLimit == null) missingFields.add("限流次数");
if (prompt == null) missingFields.add("AI提问词");
if (!missingFields.isEmpty()) {
throw new IllegalStateException("参数不能为空: " + String.join(", ", missingFields));
}
}
/**
* 判断密钥是否已经加密
*/
public boolean isEncryptedKey(String key) {
try {
// 尝试解密,如果能成功解密则认为已经是加密过的
decryptKey(key);
return true;
} catch (Exception e) {
return false;
}
}
/**
* 加密密钥
**/
public String encryptKey(String key) throws Exception {
SecretKeySpec secretKey = new SecretKeySpec(SALT.getBytes(StandardCharsets.UTF_8), "AES");
Cipher cipher = Cipher.getInstance(ALGORITHM);
cipher.init(Cipher.ENCRYPT_MODE, secretKey);
byte[] encryptedKey = cipher.doFinal(key.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(encryptedKey);
}
/**
* 解密密钥
*/
public String decryptKey(String encryptedKey) throws Exception {
SecretKeySpec secretKey = new SecretKeySpec(SALT.getBytes(StandardCharsets.UTF_8), "AES");
Cipher cipher = Cipher.getInstance(ALGORITHM);
cipher.init(Cipher.DECRYPT_MODE, secretKey);
byte[] decryptedKey = cipher.doFinal(Base64.getDecoder().decode(encryptedKey));
return new String(decryptedKey, StandardCharsets.UTF_8);
}
}