探索DataLoom的智能问数功能:简化数据库查询
在数据驱动的决策制定中,数据库查询是获取洞察的关键步骤。但是,传统的数据库查询方法往往复杂且技术性强,这限制了非技术用户的使用。DataLoom的智能问数功能正是为了解决这一问题而设计的。本文将详细介绍这一功能,并展示其背后的代码实现。
DataLoom简介
DataLoom是一个创新的数据管理平台,旨在通过提供直观的界面和强大的后端处理能力,简化数据查询和分析过程。我们的目标是让数据查询变得简单,让每个人都能轻松地从数据中获取洞察。
智能问数功能
智能问数是DataLoom的核心功能之一,它允许用户通过自然语言输入查询请求,系统会自动将其转换为SQL语句并执行。这一功能极大地降低了技术门槛,使得即使是没有数据库背景的用户也能快速获取所需数据。
核心流程图
核心代码
ChatForSQLRequest类
@Data
public class ChatForSQLRequest {
/**
* 模型Id
*/
private Long chatId;
/**
* 询问的数据
*/
private String question;
}
这是一个简单的Java Bean类,用于封装用户请求智能问数时发送的数据。它包含两个属性:chatId
(模型Id)和question
(询问的数据)。
userChatForSQL方法
public void userChatForSQL(ChatForSQLRequest chatForSQLRequest, User loginUser) {
Long chatId = chatForSQLRequest.getChatId();
String question = chatForSQLRequest.getQuestion();
// 1. 获取模型ID
Chat chat = chatService.getById(chatId);
ThrowUtils.throwIf(chat == null, ErrorCode.PARAMS_ERROR, "不存在该助手");
// 2. 获取数据源所有的元数据
Long datasourceId = chat.getDatasourceId();
List<AskAIWithDataTablesAndFieldsRequest> dataTablesAndFieldsRequests = getAskAIWithDataTablesAndFieldsRequests(loginUser, datasourceId);
// 3. 构造请求AI的输入
String input = buildAskAISQLInput(dataTablesAndFieldsRequests, question);
// 4. 持久化消息
ChatHistory user_q = new ChatHistory();
user_q.setChatRole(ChatHistoryRoleEnum.USER.getValue());
user_q.setChatId(chatId);
user_q.setModelId(chat.getModelId());
user_q.setContent(question);
chatHistoryService.save(user_q);
// 5. 利用webSocket发送消息通知开始
AskSQLWebSocketMsgVO askSQLWebSocketMsgVO = new AskSQLWebSocketMsgVO();
askSQLWebSocketMsgVO.setType("start");
askSQLWebSocket.sendOneMessage(loginUser.getId(), askSQLWebSocketMsgVO);
// 6. 询问AI,获取返回的SQL
String sql = aiManager.doAskSQLWithKimi(input, LIMIT_RECORDS);
// 7. 执行SQL,并得到返回的结果
QueryAICustomSQLVO queryAICustomSQLVO = null;
try {
queryAICustomSQLVO = buildUserChatForSqlVO(datasourceId, sql);
} catch (Exception e) { // 防止异常发生,前端还继续等待接收数据
if (e instanceof SQLException) { // 记录异常
queryAICustomSQLVO = new QueryAICustomSQLVO();
queryAICustomSQLVO.setSql(sql);
ChatHistory chatHistory = ChatHistory.builder()
.chatRole(ChatHistoryRoleEnum.MODEL.getValue())
.chatId(chatId)
.modelId(chat.getModelId())
.status(ChatHistoryStatusEnum.FAIL.getValue())
.execMessage("查询异常")
.content(JSONUtil.toJsonStr(queryAICustomSQLVO))
.build();
chatHistoryService.updateById(chatHistory);
}
notifyMessageEnd(loginUser.getId());
return;
}
// 8. 将查询的结果存放在数据库中
ChatHistory chatHistory = new ChatHistory();
chatHistory.setChatRole(ChatHistoryRoleEnum.MODEL.getValue());
chatHistory.setChatId(chatId);
chatHistory.setModelId(chat.getModelId());
// 9. 存储结果类JSON字符串
chatHistory.setContent(JSONUtil.toJsonStr(queryAICustomSQLVO));
try {
chatHistoryService.save(chatHistory);
} catch (Exception e) {
notifyMessageEnd(loginUser.getId());
return;
}
// 10. 利用webSocket发送消息通知
AskSQLWebSocketMsgVO res = AskSQLWebSocketMsgVO.builder()
.res(queryAICustomSQLVO.getRes())
.columns(queryAICustomSQLVO.getColumns())
.type("running")
.sql(sql)
.build();
askSQLWebSocket.sendOneMessage(loginUser.getId(), res);
// 11. 通知结束
notifyMessageEnd(loginUser.getId());
}
这个方法是处理用户SQL查询请求的核心逻辑。它执行以下步骤:
- 验证模型ID是否存在。
- 获取数据源的所有元数据。
- 构造请求AI的输入。
- 持久化用户的消息。
- 通过WebSocket通知前端开始处理。
- 询问AI,获取返回的SQL语句。
- 执行SQL并获取结果。
- 将查询结果持久化。
- 存储结果类为JSON字符串。
- 通过WebSocket发送查询结果。
- 通知查询结束。
getAskAIWithDataTablesAndFieldsRequests方法
/**
* 查询对应数据源所有元数据(表信息、表字段)
* @param loginUser
* @param datasourceId
* @return
*/
private List<AskAIWithDataTablesAndFieldsRequest> getAskAIWithDataTablesAndFieldsRequests(User loginUser, Long datasourceId) {
List<CoreDatasetTable> tables = coreDatasourceService.getTablesByDatasourceId(datasourceId, loginUser);
ThrowUtils.throwIf(tables.isEmpty(), ErrorCode.PARAMS_ERROR, "数据源暂无数据");
List<AskAIWithDataTablesAndFieldsRequest> dataTablesAndFieldsRequests = new ArrayList<>();
tables.forEach(table -> {
// 查询所有字段
LambdaQueryWrapper<CoreDatasetTableField> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(CoreDatasetTableField::getDatasetTableId, table.getId());
List<CoreDatasetTableField> tableFields = coreDatasetTableFieldService.list(wrapper);
AskAIWithDataTablesAndFieldsRequest askAIWithDataTablesAndFieldsRequest = AskAIWithDataTablesAndFieldsRequest.builder()
.tableId(table.getId())
.tableComment(table.getName())
.tableName(table.getTableName())
.coreDatasetTableFieldList(tableFields)
.build();
dataTablesAndFieldsRequests.add(askAIWithDataTablesAndFieldsRequest);
});
return dataTablesAndFieldsRequests;
}
这个方法用于查询给定数据源的所有表信息和表字段。它遍历所有表,为每个表查询字段信息,并构建一个包含这些信息的请求列表。
buildAskAISQLInput方法
/**
* 构造智能问数的问题
* @param dataTablesAndFieldsRequests 数据源元数据
* @param question
* @return
* 示例:
* 分析需求:%s,
* [
* {表名: %s, 表注释: %s, 字段列表:[{%s}、{%s}]}
* {表名: %s, 表注释: %s, 字段列表:[{%s}、{%s}]}
* ]
*/
private String buildAskAISQLInput(List<AskAIWithDataTablesAndFieldsRequest> dataTablesAndFieldsRequests, String question) {
StringBuilder res = new StringBuilder();
// 1. 构造需求
res.append(String.format(ANALYSIS_QUESTION, question));
res.append(SPLIT);
// 2. 构造表与字段信息
StringBuilder tablesAndFields = new StringBuilder();
dataTablesAndFieldsRequests.forEach(tableAndFields -> {
// 构造当前表字段列表
StringBuilder tableFieldsInfo = new StringBuilder();
List<CoreDatasetTableField> fieldList = tableAndFields.getCoreDatasetTableFieldList();
fieldList.forEach(field -> {
tableFieldsInfo.append(String.format(FIELDS_INFO, field.getOriginName(), field.getName(), field.getType()));
tableFieldsInfo.append(SPLIT);
});
// 构造当前表信息
String tableFieldsInfoList = String.format(LIST_INFO, tableFieldsInfo);
tablesAndFields.append(String.format(TABLE_INFO, tableAndFields.getTableName(), tableAndFields.getTableComment(), tableFieldsInfoList));
tableFieldsInfo.append(SPLIT);
});
res.append(String.format(TABLES_AND_FIELDS_PART, tablesAndFields));
return res.toString();
}
这个方法用于构造智能问数的问题。它将用户的问题和数据源的元数据结合起来,形成一个格式化的字符串,该字符串将作为AI的输入。
doAskSQLWithKimi方法
/**
* 执行智能问数
* @param message 构造的输入
* @param limitSize select 结果限制的行数
* @return
*/
public String doAskSQLWithKimi(String message, int limitSize) {
String SQLPrompt = "你是一个MySQL数据库专家,专门负责根据查询需求得出SQL查询语句,接下来我会按照以下固定格式给你提供内容: \n" +
"分析需求:{分析需求或者目标} \n" +
"所有的数据表元数据:[{数据库表名、表注释、数据库表的字段、注释以及类型}] \n" +
"请根据这两部分内容,按照以下指定格式生成内容(此外不要输出任何多余的开头、结尾、注释),并且只生成Select语句!!!, 请严格按照数据表元数据中存在的数据表和字段,不要查询不存在的表和字段\n" +
"要求select的结果不超过" + limitSize + "行";
List<Message> messages = CollUtil.newArrayList(
new Message(RoleEnum.system.name(), SQLPrompt),
new Message(RoleEnum.user.name(), message)
);
return moonshotAiClient.chat("moonshot-v1-32k",messages);
}
这个方法用于执行智能问数。它构造一个包含分析需求和数据表元数据的消息,然后通过调用AI客户端来获取SQL查询语句。
buildUserChatForSqlVO方法
/**
* 执行SQL并封装智能问数返回类
* @param datasourceId 数据源id
* @param sql 执行sql
* @return 智能问数返回类
*/
private QueryAICustomSQLVO buildUserChatForSqlVO(Long datasourceId, String sql) throws SQLException {
return datasourceEngine.execSelectSqlToQueryAICustomSQLVO(datasourceId, sql);
}
这个方法用于执行SQL语句并将结果封装到QueryAICustomSQLVO
对象中。它调用execSelectSqlToQueryAICustomSQLVO
方法,传入数据源ID和SQL语句,然后返回查询结果。
execSelectSqlToQueryAICustomSQLVO方法
/**
* 执行SQL语句并将列集合和记录犯规
* @param datasourceId 数据源id
* @param sql sql语句
* @param parameters 参数
* @return
*/
public QueryAICustomSQLVO execSelectSqlToQueryAICustomSQLVO(Long datasourceId, String sql, Object... parameters) throws SQLException {
int dsIndex = (int) (datasourceId % (dataSourceMap.size()));
// 获取对应连接池
DataSource dataSource = dataSourceMap.get(dsIndex);
QueryAICustomSQLVO queryAICustomSQLVO = new QueryAICustomSQLVO();
// 所有列
List<String> columns = new ArrayList<>();
// 所有结果
List<Map<String, Object>> res = new ArrayList<>();
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
// Set parameters to prevent SQL injection
for (int i = 0; i < parameters.length; i++) {
preparedStatement.setObject(i + 1, parameters[i]);
}
ResultSet rs = preparedStatement.executeQuery();
// Execute the query or update
// 处理查询结果
ResultSetMetaData rsmd = rs.getMetaData();
int columnCount = rsmd.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
columns.add(rsmd.getColumnName(i));
}
while (rs.next()) {
Map<String, Object> resMap = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
resMap.put(rsmd.getColumnName(i), rs.getString(i));
}
res.add(resMap);
}
queryAICustomSQLVO.setSql(sql);
queryAICustomSQLVO.setColumns(columns);
queryAICustomSQLVO.setRes(res);
return queryAICustomSQLVO;
}
这个方法用于执行SQL查询并将结果集转换为一个包含列名和记录的QueryAICustomSQLVO
对象。它使用PreparedStatement
来设置参数,执行查询,并遍历结果集,将每一行的数据存储到一个Map中,然后将这些Map添加到结果列表中。
这些代码片段共同构成了DataLoom智能问数功能的核心实现。每个片段都扮演着处理用户请求、与数据库交互、以及与AI服务通信的重要角色。
未来展望
我们对DataLoom的未来充满期待。我们计划引入更多的智能功能,这些功能将在下面的几篇文章中介绍,例如智能仪表盘,智能图表分析报告等
项目快速启动
- 见GitHub:DataLoom 仓库
- 见Gitee地址:Gitee 仓库
如何贡献
如果你对 DataLoom 感兴趣并想做出贡献,欢迎提交 issue 或 Pull Request。我们非常欢迎开发者一起加入,共同改进这个项目。
- GitHub 地址:DataLoom 仓库
- Gitee地址:Gitee 仓库
- 你可以通过创建 Issue 来报告问题,或通过提交 PR 来贡献代码。
希望这篇文章能够激发你对 DataLoom 项目的兴趣!如果你喜欢这个项目,请给我们一个 Star ⭐️,这对我们来说意义重大!
请持续关注,后续文章也会发一些有关项目功能设计亮点介绍
项目地址:DataLoom GitHub 仓库
项目问题通过下面的联系方式进行沟通
邮箱:hardork@163.com
WX号: _hardork如果需要项目文档📄,可以联系WX号