背景
当项目有很多数据源的时候,通常会在启动的时候就把数据源连接加载缓存上,当数据源进行变更后如何自动实时将缓存的数据源进行更新呢?如果是单个项目直接调接口方法就行了,但是涉及到分布式多个系统呢?
解决方案:
使用Redis轻量级消息队列,它可以实现实时通知,实时状态更新等功能,配合AOP实现自动更新数据源状态。
下面结合代码写一个使用示例:
1.首先创建数据源对象
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
*
* @author ws
* @since 2022-08-12
*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
@TableName("ed_datasource_info")
public class DatasourceInfo implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* 数据源编码
*/
@TableField("datasource_code")
private String datasourceCode;
/**
* 数据源名称
*/
@TableField("datasource_name")
private String datasourceName;
/**
* 数据源类型
*/
@TableField("datasource_type")
private String datasourceType;
/**
* 类型 0:数据库 1:Rest-api
*/
@TableField("type")
private Integer type;
/**
* 创建人
*/
@TableField("creator")
private String creator;
/**
* 模式
*/
@TableField("schema_name")
private String schemaName;
@TableField("create_time")
private Date createTime;
@TableField("update_time")
private Date updateTime;
/**
* 数据源连接信息
*/
@TableField("link_json")
private String linkJson;
}
2.初始化启动加载数据源
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.sztech.common.constant.DataSourceTypeEnum;
import com.sztech.entity.DatasourceInfo;
import com.sztech.service.DatasourceInfoService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class DataSourceRecovery implements InitializingBean {
@Resource
private DatasourceInfoService datasourceInfoService;
@Override
public void afterPropertiesSet() throws Exception {
refresh();
}
private void refresh() throws Exception{
this.refresh(null);
}
public void refresh(String sourceCode){
QueryWrapper<DatasourceInfo> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("type", DataSourceTypeEnum.DB.getKey());
if(StringUtils.isNotBlank(sourceCode)){
queryWrapper.eq("datasource_code",sourceCode);
}
List<DatasourceInfo> list = datasourceInfoService.list(queryWrapper);
if(CollectionUtils.isEmpty(list)){
return;
}
CountDownLatch countDownLatch = new CountDownLatch(list.size());
for(DatasourceInfo datasourceInfo : list){
new Thread(new ReadloadThread(datasourceInfo, countDownLatch)).start();
}
try {
countDownLatch.await(1,TimeUnit.MINUTES);
} catch (InterruptedException e) {
log.error("数据源加载等待超时",e);
}
}
/**
* 多线程加载数据源,提高启动速度
*/
static class ReadloadThread implements Runnable {
private DatasourceInfo datasourceInfo;
private CountDownLatch countDownLatch;
public ReadloadThread() {
}
public ReadloadThread(DatasourceInfo datasourceInfo,CountDownLatch countDownLatch) {
this.datasourceInfo = datasourceInfo;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
DataSourceContext.setClientMap(datasourceInfo);
DataSourceContext.setConfigMap(datasourceInfo.getDatasourceCode(),datasourceInfo);
}catch (Exception e){
log.error("datasource:{},加载失败",datasourceInfo.getDatasourceCode(),e);
}finally {
countDownLatch.countDown();
}
}
}
}
3.创建DataSourceContext,用于数据源缓存数据源连接
import com.sztech.core.tool.DBTool;
import com.sztech.entity.DatasourceInfo;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* User: wangsheng
* Date: 2022-02-11
* Time: 14:05
*/
public class DataSourceContext {
/**
* 客户端缓存
*/
private final static Map<String, IClient> clientMap = new ConcurrentHashMap<>();
/**
* 数据源配置缓存
*/
private final static Map<String, DatasourceInfo> configMap = new ConcurrentHashMap<>();
public static void setClientMap(DatasourceInfo datasourceInfo) {
if(clientMap.containsKey(datasourceInfo.getDatasourceCode())){
try {
clientMap.get(datasourceInfo.getDatasourceCode()).close();
}catch (Exception ignored){
}
}
clientMap.put(datasourceInfo.getDatasourceCode(),
DBTool.buildClient(datasourceInfo));
}
public static void setConfigMap(String key, DatasourceInfo datasourceInfo) {
configMap.put(key, datasourceInfo);
}
public static void removeClientMap(String key) {
if(clientMap.containsKey(key)){
try {
clientMap.get(key).close();
}catch (Exception ignored){
}
}
clientMap.remove(key);
}
public static void removeConfigMap(String key) {
configMap.remove(key);
}
public static IClient getClientMap(String key) {
IClient client = clientMap.get(key);
if(null == client){
throw new RuntimeException(String.format("数据源编码:[%s]不存在或被删除...", key));
}
return client;
}
public static DatasourceInfo getConfigMap(String key) {
DatasourceInfo datasourceInfo = configMap.get(key);
if(null == datasourceInfo){
throw new RuntimeException(String.format("数据源编码:[%s]不存在或被删除...", key));
}
return datasourceInfo;
}
}
package com.sztech.core.tool;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.Instance;
import com.sztech.common.constant.ResultEnum;
import com.sztech.common.exception.BizException;
import com.sztech.common.utils.ReflectionUtils;
import com.sztech.common.utils.SpringUtils;
import com.sztech.common.utils.ThreadPoolUtil;
import com.sztech.core.datasource.DataSourceContext;
import com.sztech.core.datasource.IClient;
import com.sztech.core.datasource.rdbms.RdbmsConfig;
import com.sztech.entity.*;
import com.sztech.pojo.dto.ColumnDto;
import com.sztech.pojo.dto.QueryTableDto;
import com.sztech.pojo.dto.TableDto;
import com.sztech.pojo.node.PartitionColumn;
import com.sztech.pojo.vo.*;
import com.sztech.service.CreateTableLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Description:
* User: wangsheng
* Date: 2022-08-12
* Time: 16:59
*/
@Slf4j
public class DBTool {
/**
* 建立客户端
*/
public static IClient buildClient(DatasourceInfo datasourceInfo) {
IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);
return client.open(datasourceInfo);
}
/**
* 测试数据源
*
* @return
*/
public static boolean testSource(DatasourceInfo datasourceInfo) {
IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);
return client.testSource(datasourceInfo);
}
public static List<String> getSchemas(DatasourceInfo datasourceInfo) {
List<String> schemas = new ArrayList<>();
Connection conn = null;
try {
IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);
Class.forName(client.driverName());
String linkJson = datasourceInfo.getLinkJson();
RdbmsConfig rdbmsConfig = JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class);
conn = DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getDecodePassword());
DatabaseMetaData metadata = conn.getMetaData();
try (ResultSet resultSet = metadata.getSchemas()) {
while (resultSet.next()) {
String schemaName = resultSet.getString("TABLE_SCHEM");
schemas.add(schemaName);
}
}
} catch (SQLException e) {
throw new RuntimeException(e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException ex) {
ex.printStackTrace();
}
}
}
return schemas;
}
/**
* 获取驱动名称
*/
public static String getDriverName(String datasourceType) {
IClient client = ReflectionUtils.getInstanceFromCache(datasourceType, "type", IClient.class);
return client.driverName();
}
/**
* 获取表中列信息
*/
public static List<ColumnDto> getColumns(String datasourceCode, String tableName) {
return DataSourceContext.getClientMap(datasourceCode).getColumns(tableName);
}
/**
* 获取表中分区列信息
*/
public static List<String> getPartitionColumns(String datasourceCode, String tableName) {
return DataSourceContext.getClientMap(datasourceCode).getPartitionColumns(tableName);
}
/**
* 获取表信息
*/
public static List<String> getTableNames(String datasourceCode, String tableNameLike) {
return DataSourceContext.getClientMap(datasourceCode).getTableNames(tableNameLike);
}
/**
* 获取表信息
*/
public static List<TableDto> getTables(String datasourceCode) {
return DataSourceContext.getClientMap(datasourceCode).getTables();
}
/**
* 获取单个表信息
*/
public static TableDto getTableByName(String datasourceCode, String tableName) {
return DataSourceContext.getClientMap(datasourceCode).getTableByName(tableName);
}
/**
* 获取单个表信息(创建时间,字段数)
*/
public static TableDto getTableField(String datasourceCode, String tableName) {
return DataSourceContext.getClientMap(datasourceCode).getTableField(tableName);
}
/**
* 获取表信息(获取创建时间)
*
* @param dto
* @return
*/
public static TableInfoVo getTableData(QueryTableDto dto) {
IClient client = DataSourceContext.getClientMap(dto.getDataSourceCode());
return client.getTableInfo(dto.getTableName());
}
/**
* 根据字段type建表
*/
public static void createTableByColumns(List<ColumnDto> columnDtos, String tableName, String datasourceCode) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
List<String> sqls = client.buildTableSql(columnDtos, tableName, true);
log.info("执行建表语句为:" + JSON.toJSONString(sqls));
sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));
}
/**
* 根据字段type建表
*/
public static void createTableByNotTransformedColumns(List<ColumnDto> columnDtos, String tableName, String datasourceCode) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
List<String> sqls = client.buildTableSql(columnDtos, tableName, false);
log.info("执行建表语句为:" + JSON.toJSONString(sqls));
sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));
}
/**
* 创建索引
* 注: oracle 索引名在整个库里必须唯一 否则建立失败
*
* @param datasourceCode 数据源编码
* @param tableName 表名
* @param filedNames filed1,filed2...
* @param unique 唯一
*/
public static void createIndex(String datasourceCode, String tableName, String filedNames, Boolean unique) {
DataSourceContext.getClientMap(datasourceCode).createIndex(tableName, filedNames, unique);
}
/**
* sql校验
*
* @param datasourceCode
* @param sql
* @param sourceType
* @return
*/
public static Map<String, Object> checkSql(String datasourceCode, String sql, String sourceType) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
return client.checkSql(sql, sourceType);
}
/**
* 根据sql创建表
*
* @param datasourceCode
* @param sql
*/
public static void createTableWithSql(String datasourceCode, String sql) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
log.info("执行建表语句为:" + JSON.toJSONString(sql));
client.executeCommandSyn(sql, new HashMap<>());
// DataSourceContext.getClientMap(datasourceCode).createTableWithSql(sql);
}
/**
* 删除表
*
* @param datasourceCode
* @param tableName
*/
public static void dropTable(String datasourceCode, String tableName) {
DataSourceContext.getClientMap(datasourceCode).dropTable(tableName);
}
/**
* 单表查询数据
*/
public static List<Map<String, Object>> selectDataFromTable(String datasourceCode, List<DataTableColumn> columns, String tableName, String search, Integer limit) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
// 获取查询语句
String selectSql = client.getSelectSql(columns, tableName, search, limit);
log.info("执行语句:" + selectSql);
return client.selectDataFromTable(selectSql, null);
}
/**
* 单表查询数据
*/
public static List<Map<String, Object>> selectFromTable(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
// 获取查询语句
String selectSql = client.getFormSelectSql(columns, searchColumns, tableName, search, pageNum, pageSize, params);
log.info("执行语句:" + selectSql);
return client.selectDataFromTable(selectSql, params);
}
/**
* 单表查询数据
*/
public static List<Map<String, Object>> selectFromForBackUp(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
// 获取查询语句
String selectSql = client.selectFromForBackUp(columns, searchColumns, tableName, search, pageNum, pageSize, params);
log.info("执行语句:" + selectSql);
return client.selectDataFromTable(selectSql, params);
}
/**
* 单表查询数据
*/
public static List<Map<String, Object>> selectFromFile(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
// 获取查询语句
String selectSql = client.getFormSelectSqlForFile(columns, searchColumns, tableName, search, pageNum, pageSize, params);
log.info("执行语句:" + selectSql);
return client.selectDataFromTable(selectSql, params);
}
/**
* 查询单表是否存在文件名
*/
public static List<Map<String, Object>> getExistOldName(String datasourceCode, String tableName, String search) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
// 获取查询语句
String selectSql = client.getExistOldName( tableName, search);
log.info("执行语句:" + selectSql);
return client.selectDataFromTable(selectSql, null);
}
/**
* 单表查询数据(查询归集表专门使用)
*/
public static List<Map<String, Object>> selectCollectTable(CollectConditionVo vo) {
IClient client = DataSourceContext.getClientMap(vo.getDatasourceCode());
// 获取查询语句
String selectSql = client.getCollectTable(vo);
log.info("执行语句:" + selectSql);
return client.selectDataFromTable(selectSql, vo.getParams());
}
/**
* 单表查询数据量
*/
public static Map<String, Object> getFormCount(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, MapSqlParameterSource params) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
// 获取查询语句
String selectSql = client.getCountSql(columns, searchColumns, tableName, search, params);
log.info("执行语句:" + selectSql);
return client.getCount(selectSql, params);
}
/**
* 查询区县库表的数据量
*/
public static Map<String, Object> getCountryCount(String datasourceCode, String tableName, MapSqlParameterSource params) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
// 获取查询语句
String selectSql ="select count(1) as count from "+tableName;
log.info("执行语句:" + selectSql);
return client.getCount(selectSql, params);
}
public static Map<String, Object> getFormCountForFile(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, MapSqlParameterSource params) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
// 获取查询语句
String selectSql = client.getCountSqlForFile(columns, searchColumns, tableName, search, params);
log.info("执行语句:" + selectSql);
return client.getCount(selectSql, params);
}
/**
* 查询表数据量
*/
public static Long getTableRows(String datasourceCode, String tableName) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
return client.getTableRows(tableName);
}
/**
* 查询表对应分区数据量
*/
public static Long getTablePartitionRows(String datasourceCode, String tableName, List<PartitionColumn> partitionColumns) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
return client.getTablePartitionRows(tableName, partitionColumns);
}
/**
* 查询表数据量
*/
public static Integer getTablePhysicalSize(String datasourceCode, String tableName) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
return client.getPhysicalSize(tableName);
}
/**
* 获取表最大值
*
* @param datasourceCode 数据源编码
* @param tableName 表名
* @param incColumnName 自增列名
* @return {@link Integer}
*/
public static Object getMaxValue(String datasourceCode, String tableName, String incColumnName, String condition) {
return DataSourceContext.getClientMap(datasourceCode).getMaxValue(tableName, incColumnName, condition);
}
public static Object getMaxValue(String datasourceCode, String schema, String tableName, String incColumnName, String condition) {
return DataSourceContext.getClientMap(datasourceCode).getMaxValue(schema, tableName, incColumnName, condition);
}
public static Object getMaxTime(String datasourceCode, String schema, String tableName, String incColumnName, String tongId,String condition) {
return DataSourceContext.getClientMap(datasourceCode).getMaxTime(schema, tableName, incColumnName,tongId, condition);
}
/**
* 字段存在
*
* @param datasourceCode 数据源编码
* @param tableName 表名
* @param fieldName 字段名
* @return {@link Boolean}
*/
public static Boolean fieldExist(String datasourceCode, String tableName, String fieldName) {
List<ColumnDto> columns = getColumns(datasourceCode, tableName);
return columns.stream().anyMatch(s -> s.getName().equalsIgnoreCase(fieldName));
}
/**
* 数据预览 获取前十条
*
* @return
*/
public static String dataView(String datasourceCode, String tableName, String condition) {
return DataSourceContext.getClientMap(datasourceCode).dataView(tableName, condition);
}
/**
* 创建分区临时表
* odps适用
*/
public static void createPartitionedTableByColumns(List<ColumnDto> columnDtos, String tableName, String tableComment, String partitionedField, String datasourceCode) {
DataSourceContext.getClientMap(datasourceCode).createPartitionedTableByColumns(columnDtos, tableName, tableComment, partitionedField);
}
/**
* 同步执行命令
*/
public static void executeCommandSyn(String datasourceCode, String command, Map<String, Object> params) {
DataSourceContext.getClientMap(datasourceCode).executeCommandSyn(command, params);
}
/**
* 异步执行命令
* odps适用
*/
public static Instance executeCommandASyn(String datasourceCode, String command, Map<String, Object> params) {
return DataSourceContext.getClientMap(datasourceCode).executeCommandASyn(command, params);
}
/**
* 是否有导出权限
* odps适用
*
* @param datasourceCode 数据源编码
* @param tableName 表名
* @return {@link Boolean}
*/
public static Boolean exportEnable(String datasourceCode, String tableName) {
return DataSourceContext.getClientMap(datasourceCode).exportEnable(tableName);
}
/**
* 插入单条数据
*
* @param datasourceCode
* @param vo
* @return
*/
public static Integer insert(String datasourceCode, FormTableVo vo) {
return DataSourceContext.getClientMap(datasourceCode).insert(vo);
}
/**
* 批量插入数据
*
* @param datasourceCode
* @param vo
* @return
*/
public static Integer[] betchInsert(String datasourceCode, FormTableVo vo) {
return DataSourceContext.getClientMap(datasourceCode).betchInsert(vo);
}
/**
* 批量插入数据
*
* @param datasourceCode
* @param vo
* @return
*/
public static Integer[] betchInsertByConnection(String datasourceCode, FormTableVo vo) {
return DataSourceContext.getClientMap(datasourceCode).betchInsertByConnection(vo);
}
/**
* 这个方法不需要分装参数,直接传字段名称list就好了
* @param datasourceCode
* @param vo
* @return
*/
public static Integer[] betchInsertForCommom(String datasourceCode, FormTableVo vo) {
return DataSourceContext.getClientMap(datasourceCode).betchInsertForCommom(vo);
}
/**
* 删除数据
*
* @param datasourceCode
* @param vo
* @return
*/
public static Integer delete(String datasourceCode, FormTableVo vo) {
return DataSourceContext.getClientMap(datasourceCode).delete(vo);
}
/**
* 这个删除方法可以自定义条件服号
* @param datasourceCode
* @param vo
* @return
*/
public static Integer deleteForCommon(String datasourceCode, FormTableVo vo) {
return DataSourceContext.getClientMap(datasourceCode).deleteForCommon(vo);
}
public static Integer deleteForFile(String datasourceCode, FormTableVo vo) {
return DataSourceContext.getClientMap(datasourceCode).deleteForFile(vo);
}
public static String deleteForPre(String datasourceCode, FormTableVo vo) {
return DataSourceContext.getClientMap(datasourceCode).deleteForPre(vo);
}
/**
* 修改数据
*
* @param datasourceCode
* @param vo
* @return
*/
public static Integer update(String datasourceCode, FormTableVo vo) {
return DataSourceContext.getClientMap(datasourceCode).update(vo);
}
/**
* 修改数据
*
* @param datasourceCode
* @param vo
* @return
*/
public static Integer updateForFile(String datasourceCode, FormTableVo vo) {
return DataSourceContext.getClientMap(datasourceCode).updateForFile(vo);
}
/**
* 获取表单基本信息
*
* @param vo
* @return
*/
public static TableMetaDataVo getTableBasicInfo(String datasourceCode, FormTableVo vo) {
return DataSourceContext.getClientMap(datasourceCode).getTableBasicInfo(vo);
}
/**
* 根据字段type建表
*/
public static void createCollectTable(List<CatalogColumnInfo> columnDtos, String tableName, String datasourceCode, String tableComment, Boolean ifPartition) {
IClient client = DataSourceContext.getClientMap(datasourceCode);
List<String> sqls = client.buildTableSqlForCollect(columnDtos, tableName, tableComment, ifPartition);
log.info("执行建表语句为:" + JSON.toJSONString(sqls));
try {
sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));
} catch (Exception e) {
e.printStackTrace();
String message = e.getMessage();
if (e instanceof BizException) {
BizException exception = (BizException) e;
message = exception.getMsg();
}
log.error("建表错误=======================>{}:", message);
ThreadPoolExecutor instance = ThreadPoolUtil.instance();
String finalMessage = message;
instance.submit(() -> {
CreateTableLog createTableLog = new CreateTableLog();
createTableLog.setErrorLog(finalMessage);
createTableLog.setParams(JSON.toJSONString(sqls));
createTableLog.setCode(tableName);
CreateTableLogService createTableLogService = SpringUtils.getBean(CreateTableLogService.class);
createTableLogService.save(createTableLog);
});
throw new BizException(ResultEnum.ERROR.getCode(), "建表失败请联系管理员");
}
}
/**
* 根据字段type建表
*/
public static void updateCollectTable(CreateCollectVo vo) {
IClient client = DataSourceContext.getClientMap(vo.getDatasourceCode());
List<String> sqls = client.buildTableSqlForUpdate(vo);
log.info("执行更新表语句为:" + JSON.toJSONString(sqls));
try {
sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));
} catch (Exception e) {
e.printStackTrace();
String message = e.getMessage();
if (e instanceof BizException) {
BizException exception = (BizException) e;
message = exception.getMsg();
}
log.error("建表错误=======================>{}:", message);
ThreadPoolExecutor instance = ThreadPoolUtil.instance();
String finalMessage = message;
instance.submit(() -> {
CreateTableLog createTableLog = new CreateTableLog();
createTableLog.setErrorLog(finalMessage);
createTableLog.setParams(JSON.toJSONString(sqls));
createTableLog.setCode(vo.getTableName());
CreateTableLogService createTableLogService = SpringUtils.getBean(CreateTableLogService.class);
createTableLogService.save(createTableLog);
});
log.info("建表失败了开始准备抛出了-------------------------------------->");
throw new BizException(ResultEnum.ERROR.getCode(), "建表失败请联系管理员");
}
}
/**
* 获取数据源下所有表信息(包括表名,表字段数,表创建时间)
*
* @param datasourceCode
* @param tableNameLike
* @return
*/
public static List<TableDto> getTablesDetail(String datasourceCode, String tableNameLike, Integer start, Integer pageSize, String specifyTableName) {
return DataSourceContext.getClientMap(datasourceCode).getTablesDetail(tableNameLike, start, pageSize, specifyTableName);
}
/**
* 获取表数量
* @param datasourceCode
* @param tableName
* @return
*/
public static Long getTableCountSchema(String datasourceCode, String tableName) {
return DataSourceContext.getClientMap(datasourceCode).getTableCountSchema(tableName);
}
public static Integer getTableColumnCount(String dataSourceCode, String tableName) {
return DataSourceContext.getClientMap(dataSourceCode).getTableColumnCount(tableName);
}
public static Integer getPreTableColumnCount(String dataSourceCode, String tableName) {
return DataSourceContext.getClientMap(dataSourceCode).getPreTableColumnCount(tableName);
}
/**
* 获取符号
* @return
*/
public static String getSymbol(String datasourceCode) {
return DataSourceContext.getClientMap(datasourceCode).getSymbol();
}
}
import lombok.extern.slf4j.Slf4j;
import org.reflections.Reflections;
import java.lang.reflect.Modifier;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class ReflectionUtils {
private static final Map<String, Set<?>> clazzMap = new ConcurrentHashMap<>();
private static final ReentrantLock clazzLock = new ReentrantLock();
/**
* 通过反射获取接口/抽象类的所有实现类
* 通过缓存类信息减少查找时间
* 接口与抽象类必须放在实现类的同级目录或者父目录
*/
@SuppressWarnings("unchecked")
public static <T> Set<Class<? extends T>> getReflections(Class<T> clazz) {
if (clazzMap.containsKey(clazz.getName())) {
return (Set<Class<? extends T>>) clazzMap.get(clazz.getName());
}
try {
clazzLock.lock();
if (clazzMap.containsKey(clazz.getName())) {
return (Set<Class<? extends T>>) clazzMap.get(clazz.getName());
}
Reflections reflections = new Reflections(clazz.getPackage().getName());
Set<Class<? extends T>> subTypesOf = reflections.getSubTypesOf(clazz);
clazzMap.put(clazz.getName(), subTypesOf);
return subTypesOf;
} catch (Exception e) {
log.error("getReflections error", e);
} finally {
clazzLock.unlock();
}
return new HashSet<>();
}
/**
* 通过反射获取新对象
* @param type type
* @param methodName methodName
* @param clazz clazz
* @return <T>
*/
public static <T> T getInstance(String type, String methodName, Class<T> clazz) {
Set<Class<? extends T>> set = getReflections(clazz);
for (Class<? extends T> t : set) {
try {
//排除抽象类
if (Modifier.isAbstract(t.getModifiers())) {
continue;
}
Object obj = t.getMethod(methodName).invoke(t.newInstance());
if (type.equalsIgnoreCase(obj.toString())) {
return t.newInstance();
}
} catch (Exception e) {
log.error("getInstance error", e);
}
}
throw new RuntimeException("implement class not exist");
}
/**
* 通过反射获取新对象
* @param type type
* @param methodName methodName
* @param clazz clazz
* @return <T>
*/
public static <T> T getInstanceFromCache(String type, String methodName, Class<T> clazz) {
return getInstance(type, methodName, clazz);
}
}
client客户接口端适配多种数据源
import com.ws.websocket.entity.DatasourceInfo;
/**
* Description:
* User: wangsheng
* Date: 2022-12-30
* Time: 10:31
*/
public interface IClient {
/**
* 连接数据源
*
* @param dataSourceInfo 数据源信息
* @return {@link IClient}
*/
IClient open(DatasourceInfo dataSourceInfo);
/**
* 关闭数据源
*/
void close();
/**
* 驱动类型
*
* @return
*/
String driverName();
/**
* 数据源类型
*
* @return {@link String}
*/
String type();
/**
* 测试数据源
*
* @param datasourceInfo
* @return
*/
boolean testSource(DatasourceInfo datasourceInfo);
}
import com.ws.websocket.entity.DatasourceInfo;
//公共查询
public abstract class AbsClient implements IClient {
protected DatasourceInfo datasourceInfo;
}
package com.ws.websocket.util;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
@Slf4j
public abstract class AbsRdbmsClient extends AbsClient {
protected DruidDataSource druidDataSource;
@Override
public IClient open(DatasourceInfo datasourceInfo) {
RdbmsConfig rdbmsConfig = JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class);
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setInitialSize(5);
druidDataSource.setMinIdle(30);
druidDataSource.setMaxActive(300);
druidDataSource.setMaxWait(10000);
druidDataSource.setBreakAfterAcquireFailure(true);// 跳出重试循环
druidDataSource.setConnectionErrorRetryAttempts(3);// 重试三次
druidDataSource.setTimeBetweenConnectErrorMillis(3000);
druidDataSource.setLoginTimeout(3);
druidDataSource.setUrl(rdbmsConfig.getJdbcUrl());
druidDataSource.setDriverClassName(driverName());
druidDataSource.setUsername(rdbmsConfig.getUsername());
//解密
// druidDataSource.setPassword(RsaUtils.decode(rdbmsConfig.getPassword()));
druidDataSource.setPassword(rdbmsConfig.getPassword());
// 设置 MetaUtil 工具类所需参数
Properties properties = new Properties();
properties.put("remarks", "true");
properties.put("useInformationSchema", "true");
druidDataSource.setConnectProperties(properties);
this.druidDataSource = druidDataSource;
this.datasourceInfo = datasourceInfo;
return this;
}
@Override
public void close() {
druidDataSource.close();
}
@Override
public boolean testSource(DatasourceInfo datasourceInfo) {
Connection connection = null;
try {
Class.forName(driverName());
String linkJson = datasourceInfo.getLinkJson();
RdbmsConfig rdbmsConfig = JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class);
connection = DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getPassword());
// 有效
if (connection.isValid(3)) {
return true;
} else {
return false;
}
} catch (SQLException e) {
log.error("数据源测试失败", e);
return false;
} catch (ClassNotFoundException e) {
log.error("未找到驱动信息:{}", driverName());
return false;
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException ex) {
ex.printStackTrace();
}
}
}
}
@Data
class RdbmsConfig {
private String jdbcUrl;
private String username;
private String password;
public void setSSL() {
String lowerCase = this.jdbcUrl.toLowerCase();
if (!lowerCase.contains("usessl")) {
if (this.jdbcUrl.contains("?")) {
this.jdbcUrl = this.jdbcUrl + "&useSSL=false";
} else {
this.jdbcUrl = this.jdbcUrl + "?useSSL=false";
}
}
}
}
}
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@Slf4j
public class DmClient extends AbsRdbmsClient {
private String schema;
@Override
public String type() {
return "DMDB";
}
@Override
public String driverName() {
return "dm.jdbc.driver.DmDriver";
}
@Override
public IClient open(DatasourceInfo datasourceInfo) {
RdbmsConfig commonLinkParams = JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class);
this.schema = StringUtils.isNotBlank(datasourceInfo.getSchemaName()) ? datasourceInfo.getSchemaName() : commonLinkParams.getUsername().toUpperCase();
datasourceInfo.setSchemaName(schema);
return super.open(datasourceInfo);
}
@Override
public void close() {
}
@Override
public boolean testSource(DatasourceInfo datasourceInfo) {
return false;
}
}
4.创建redis订阅数据源操作频道配置
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
/**
* @Author: wangsheng
* @Data: 2022/8/16 16:40
*/
@Slf4j
@Configuration
public class RedisListenerConfig {
/**
* 订阅数据源操作频道
*
* @param connectionFactory connectionFactory
* @param dataSourceMonitor 数据源监视器
* @return RedisMessageListenerContainer
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
DataSourceMonitor dataSourceMonitor){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(dataSourceMonitor, new PatternTopic("DATASOURCE_CHANNEL"));
log.info(dataSourceMonitor.getClass().getName() + " 订阅频道 {}", "DATASOURCE_CHANNEL");
return container;
}
}
5.redis监听数据源操作
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* Description: redis监听数据源操作
* User: wangsheng
* Date: 2022-08-12
* Time: 17:07
*/
@Slf4j
@Component
public class DataSourceMonitor implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
JSONObject box = JSONObject.parseObject(new String(message.getBody(), StandardCharsets.UTF_8));
String operation = box.getString("key");
if ("SAVE_OR_UPDATE".equals(operation)) {
// 更新 DataSourceContext
DatasourceInfo datasourceInfo = box.getObject("value", DatasourceInfo.class);
if (datasourceInfo.getType().equals(0)) {
String datasourceCode = datasourceInfo.getDatasourceCode();
DataSourceContext.setConfigMap(datasourceCode, datasourceInfo);
DataSourceContext.setClientMap(datasourceInfo);
log.info("redis 监听到数据源 {} 新增或更新,更新 DataSourceContext 完成", datasourceCode);
}
} else {
String datasourceCode = box.getString("value");
// 更新 DataSourceContext
DataSourceContext.removeConfigMap(datasourceCode);
DataSourceContext.removeClientMap(datasourceCode);
log.info("redis 监听到数据源 {} 删除,更新 DataSourceContext 完成", datasourceCode);
}
}
}
6.创建AOP自动监听数据源变化
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: wangsheng
* @Data: 2022/8/15 16:37
*/
@Slf4j
@Aspect
@Component
public class DatasourceAspect {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 新增或编辑数据源时发布 Redis 消息
*/
@AfterReturning(value = "execution(* com.ws.service.DatasourceInfoService.saveOrUpdateDatasourceInfo(..))", returning = "datasourceInfo")
public void saveOrUpdate(JoinPoint joinPoint, DatasourceInfo datasourceInfo) {
HashMap<String, Object> box = new HashMap<>(4);
box.put("key", "SAVE_OR_UPDATE");
box.put("value", datasourceInfo);
// 发布 Redis 消息
stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL",JSONObject.toJSONString(box));
log.info("新增或更新数据源 {} 方法切面发布 Redis 消息完成", datasourceInfo.getDatasourceCode());
}
/**
* 删除数据源时发布 Redis 消息
*/
@AfterReturning(value = "execution(* com.ws.service.DatasourceInfoService.deleteDatasourceInfo(..))", returning = "datasourceCode")
public void delete(JoinPoint joinPoint, String datasourceCode) {
Map<String, Object> box = new HashMap<>(4);
box.put("key", "DELETE");
box.put("value", datasourceCode);
// 发布 Redis 消息
stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL", JSONObject.toJSONString(box));
log.info("删除数据源 {} 方法切面发布Redis消息完成", datasourceCode);
}
}
这样就解决了数据源连接信息自动加载更新同步的问题,但还是有个问题,当数据源重启后,缓存的连接信息会失效,且AOP无法监听到数据源重启变动,这个时候还需要一个定时任务对数据源进行连接测试,如果失效则重新连接缓存上。
7.创建定时任务
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ws.websocket.entity.DatasourceInfo;
import com.ws.websocket.service.DatasourceInfoService;
import com.ws.websocket.util.DBTool;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
@Component
@RequiredArgsConstructor
@Slf4j
public class DataSourceRetryConnectSchedule {
@Resource
private DatasourceInfoService datasourceInfoService;
@Resource
private StringRedisTemplate stringRedisTemplate;
//每2小时执行一次
@Scheduled(cron = "0 0 */2 * * ?")
public void RetryConnect() {
log.info("开始监测数据源连接");
QueryWrapper<DatasourceInfo> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("type", 0);
List<DatasourceInfo> list = datasourceInfoService.list(queryWrapper);
if (CollectionUtils.isEmpty(list)) {
return;
}
for (DatasourceInfo datasourceInfo : list) {
Boolean bb = DBTool.testSource(datasourceInfo);
if (!bb) {
log.info("数据源重连{}"+datasourceInfo.getDatasourceName());
HashMap<String, Object> box = new HashMap<>(4);
box.put("key", "SAVE_OR_UPDATE");
box.put("value", datasourceInfo);
// 发布 Redis 消息
stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL", JSONObject.toJSONString(box));
}
}
}
}