一、简介
官网:Apache ShardingSphere
下载地址:下载 :: ShardingSphere
快速入门:ShardingSphere-JDBC :: ShardingSphere
分库分表框架
ShardingSphere包括Sharding-JDBC、Sharding-Proxy、Sharding-Sidecar三个开源分布式数据库中间件解决犯案构成。
ShardingSphere-JDBC 采用无中心化架构,适用于 Java 开发的高性能的轻量级 OLTP(连接事务处理) 应用。
ShardingSphere-Proxy 提供静态入口以及异构语言的支持,适用于 OLAP(连接数据分析) 应用以及对分片数据库进行管理和运维的场景。
Apache ShardingSphere 是多接入端共同组成的生态圈。 通过混合使用 ShardingSphere-JDBC 和 ShardingSphere-Proxy,并采用同一注册中心统一配置分片策略,能够灵活的搭建适用于各种场景的应用系统,使得架构师更加自由地调整适合与当前业务的最佳系统架构。
比较:
Sharding-JDBC | Sharding-Proxy | Sharding-Sidecar | |
数据库 | 任意 | MYSQL | MYSQL |
连接消耗数 | 高 | 低 | 低 |
异构语言 | 仅JAVA | 任意 | 任意 |
性能 | 损耗低 | 损耗高 | 损耗低 |
中心化 | 是 | 否 | 是 |
静态入口 | 无 | 有 | 无 |
处理过程:
二、概念
1、分片
数据分片就是将一张数据量大的表拆分成多个结构相同的数据量小的表,例如1000w的order表,拆分成order_1、order_2。根据分库策略、分表策略分散到不同的库/表。
2、数据节点
分库分表中一个不可再分的最小数据单元,由数据源名称和数据表组成,例如order_1。
3、逻辑表
一组局域相同逻辑和数据表结构的总称。
例如将order表拆分成order_1、order_2,在数据库存在的带有数字(order_1)的表,但写sql的时候,是按照order来写的。此时的order就是逻辑表。
4、真实表
也就是上述说的在数据库存在的order_1、order_2真实存在的物理表。
5、分片键
用于分片的表中的字段。
例如根据order表中的order_id取模,决定这条数据存在于哪个库的哪个表中。order_id就是分片键。Sharding-JDBC支持多个字段作为分片键来分片。
6、分片算法
1)精确分片算法(PreciseShardingAlgorithm)
单个字段作为分片键,SQL中有 =
与 IN
等条件的分片,需要在标准分片策略(StandardShardingStrategy
)下使用。
2)范围分片算法(RangeShardingAlgorithm)
单个字段作为分片键,SQL中有 BETWEEN AND
、>
、<
、>=
、<=
等条件的分片,需要在标准分片策略(StandardShardingStrategy
)下使用。
3)复合分片算法(ComplexKeysShardingAlgorithm)
多个字段作为分片键的分片操作,同时获取到多个分片健的值,根据多个字段处理业务逻辑。需要在复合分片策略(ComplexShardingStrategy
)下使用。
4)Hint分片算法(HintShardingAlgorithm)
上边的算法中我们都是解析SQL
语句提取分片键,并设置分片策略进行分片。但有些时候我们并没有使用任何的分片键和分片策略,可还想将 SQL 路由到目标数据库和表,就需要通过手动干预指定SQL的目标数据库和表信息,这也叫强制路由。
7、分片策略
1)标准分片策略
适用于单分片键,此策略支持精确分片算法和范围分片算法两个分片算法。
精确分片算法是必选的,用于处理 =
和 IN
的分片。
范围分片算法是可选的,用于处理BETWEEN AND
, >
, <
,>=
,<=
条件分片,如果不配置范围分片算法,SQL中的条件等将按照全库路由处理。
2)复合分片策略
支持对 SQL语句中的 =
,>
, <
, >=
, <=
,IN
和 BETWEEN AND
的分片操作。不同的是它支持多分片键,具体分配片细节完全由应用开发者实现。
3)行表达式分片策略
支持对 SQL语句中的 =
和 IN
的分片操作,但只支持单分片键。这种策略通常用于简单的分片,不需要自定义分片算法,可以直接在配置文件中接着写规则。
4)Hint分片策略
Hint分片策略,对应上边的Hint分片算法,通过指定分片健而非从 SQL
中提取分片健的方式进行分片的策略。
8、分布式主键
参考:九种分布式ID方式
9、广播表
存在于所有的分片数据源中的表,表结构和表中数据在每个库中完全一致。如果修改某个表中的数据,所有的广播表数据跟着同步。
10、绑定表
分片规则一致的主表和子表。例如order订单表和order_detail订单详情表,按order_id分片。
如果不配置绑定表关系,在关联表时,会出现笛卡尔积。
当关联查询时,order表是主表,所有的计算都使用主表的策略,也就是order_detail分片相关计算也用的是主表的条件,所以要保证主表和子表的分片键相同。
11、SQL路由
1)分片路由
- 直接路由:只针对分库不分表的情况,可以避免SQL解析和之后的结果归并;
- 标准路由:不包括关联查询或者只包括绑定表(主表和子表有相同的sharding key)之间的关联查询;一般包括=,in和between and三种情况;
- 笛卡尔积:非绑定表之间的关联查询;性能最差;
2)广播路由
查询中不包括sharding key,这个是需要做全局扫描和结果归并的;
- 全库路由:主要包括对数据库的操作,比如SET指令等TCL指令;
- 全库表路由:包括不带分片键的DQL和DML,以及DDL等;
- 全实例路由:用于DCL操作(设置或更改数据库用户或角色权限),授权语句针对的是数据库的实例;
- 单播路由:用于获取某一真实表信息的场景,它仅需要从任意库中的任意真实表中获取数据即可,比如desc table;
- 阻断路由:用于屏蔽SQL对数据库的操作,比如use database;
参考资料:sharding-sphere之SQL路由
三、分库分表
例如一个4核16G的服务器,单库MYSQL并发在(QPS+TPS)超过2k,系统就会崩溃。一般控制在1k是较好的。
高并发:造成IO读写频繁
大数据量:MYSQL索引是B+TREE,大数据量会导致索引树庞大,造成查询缓慢,innodb的最大存储限制64TB
这就引出了分库分表来解决上述的问题,一张表的数据最好不超过500w。
1、水平拆分
统一表的数据拆到不同的库不同的表中。通过路由访问到具体的数据。拆分后的每个表结构保持一致。
2、垂直拆分
就是把一个有很多字段的表给拆分成多个表,或者是多个库上去。每个库表的结构都不一样,每个库表都包含部分字段。一般来说,可以根据业务维度进行拆分,如订单表可以拆分为订单、订单支持、订单地址、订单商品、订单扩展等表;也可以,根据数据冷热程度拆分,20%的热点字段拆到一个表,80%的冷字段拆到另外一个表。
3、不停机分库分表数据迁移
TODO : 这一部分有待于后续学习
利用mysql+canal做增量数据同步,利用分库分表中间件,将数据路由到对应的新表中。
利用分库分表中间件,全量数据导入到对应的新表中。
通过单表数据和分库分表数据两两比较,更新不匹配的数据到新表中。
数据稳定后,将单表的配置切换到分库分表配置上。
四、事务管理
官方地址:
柔性事务-Seata :: ShardingSphere
分布式事务 :: ShardingSphere
TODO : 关于seata的学习后续开展
1) 数据库事务:ACID
- 原子性(Atomicity)指事务作为整体来执行,要么全部执行,要么全不执行。
- 一致性(Consistency)指事务应确保数据从一个一致的状态转变为另一个一致的状态。
- 隔离性(Isolation)指多个事务并发执行时,一个事务的执行不应影响其他事务的执行。
- 持久性(Durability)指已提交的事务修改数据会被持久保存。
2) 本地事务
在不开启任何分布式事务管理器的前提下,各个数据节点各自管理自己的事务。 它们之间没有协调以及通信的能力,也并不互相知晓其他数据节点事务的成功与否。
- 完全支持非跨库事务,例如:仅分表,或分库但是路由的结果在单库中。
- 完全支持因逻辑异常导致的跨库事务。例如:同一事务中,跨两个库更新。更新完毕后,抛出空指针,则两个库的内容都能回滚。
- 不支持因网络、硬件异常导致的跨库事务。例如:同一事务中,跨两个库更新,更新完毕后、未提交之前,第一个库宕机,则只有第二个库数据提交。
3) 两阶段提交
XA协议最早的分布式事务模型是由X/Open国际联盟提出的X/Open Distributed Transaction Processing(DTP)模型,简称XA协议。
基于XA协议实现的分布式事务对业务侵入很小。
它最大的优势就是对使用方透明,用户可以像使用本地事务一样使用基于XA协议的分布式事务。 XA协议能够严格保障事务ACID特性。事务执行在过程中需要将所需资源全部锁定,它更加适用于执行时间确定的短事务。
对于长事务来说,整个事务进行期间对数据的独占,将导致对热点数据依赖的业务系统并发性能衰退明显。 因此,在高并发的性能至上场景中,基于XA协议的分布式事务并不是最佳选择。
- 支持数据分片后的跨库XA事务
- 两阶段提交保证操作的原子性和数据的强一致性
- 服务宕机重启后,提交/回滚中的事务可自动恢复
- SPI机制整合主流的XA事务管理器,默认Atomikos,可以选择使用Narayana和Bitronix
- 同时支持XA和非XA的连接池
- 提供spring-boot和namespace的接入端
不支持:
- 服务宕机后,在其它机器上恢复提交/回滚中的数据
3) 柔性事务
如果将实现了ACID的事务要素的事务称为刚性事务的话,那么基于BASE事务要素的事务则称为柔性事务。 BASE是基本可用、柔性状态和最终一致性这三个要素的缩写。
- 基本可用(Basically Available):保证分布式事务参与方不一定同时在线。
- 柔性状态(Soft state):允许系统状态更新有一定的延时。
- 最终一致性(Eventually consistent):通常是通过消息传递的方式保证系统的最终一致性。
在ACID事务中对隔离性的要求很高,在事务执行过程中,必须将所有的资源锁定。
柔性事务的理念则是通过业务逻辑将互斥锁操作从资源层面上移至业务层面。通过放宽对强一致性要求,来换取系统吞吐量的提升。
- 完全支持跨库分布式事务
- 支持RC隔离级别
- 通过undo快照进行事务回滚
- 支持服务宕机后的,自动恢复提交中的事务
依赖:
- 需要额外部署Seata-server服务进行分支事务的协调
待优化项 - ShardingSphere和Seata会对SQL进行重复解析
4)比较
本地事务 | 两阶段事务 | 柔性事务 | |
业务改造 | 无 | 无 | 实现接口 |
一致性 | 不支持 | 支持 | 最终一致 |
隔离性 | 不支持 | 支持 | 业务方保证 |
并发性能 | 无 | 严重衰退 | 略衰退 |
适合场景 | 业务方处理不一致 | 短事务、低并发 | 长事务、高并发 |
5)实例
<dependency>
<groupId>io.shardingsphere</groupId>
<artifactId>sharding-transaction-spring-boot-starter</artifactId>
<version>3.1.0</version>
</dependency>
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private OrderMapper orderMapper;
//XA事务
@ShardingTransactionType(TransactionType.XA)
@Transactional(rollbackFor = Exception.class)
public int saveUserOrder(User user, Order order) {
userMapper.addUser(user);
order.setUserid(user.getId());
orderMapper.addOrder(order);
return 1;
}
}
五、实战
1、依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-core</artifactId>
<version>4.0.0</version>
</dependency>
2、实体类
创建对应的mapper.java、mapper.xml
3、配置
application.properties
#端口号
server.port=1223
# nacos
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
# mapper
mybatis.mapper-locations=classpath:mapper/*.xml
# 打印SQl
spring.shardingsphere.props.sql-show=true
sql.showSjSql=true
#0库
db.0.url=jdbc:mysql://localhost:3308/db0?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowMultiQueries=true
db.0.username=root
db.0.password=root
#1库
db.1.url=jdbc:mysql://localhost:3308/db1?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowMultiQueries=true
db.1.username=root
db.1.password=root
#数据源
db.driver=com.mysql.cj.jdbc.Driver
4、分库分表配置
shardingJdbc.conf
{
"bindingTables": [
"t_order,t_order_item"
],
"broadcastTables": [
"t_config"
],
"shardingAlgorithm": [
{
"logicTable": "t_order",
"actualDataNodes": "db$->{0..1}.t_order_$->{2023..2025}",
"db": {
"column": "id",
"algorithm": "db$->{ id % 2 }"
},
"table": {
"column": "year"
},
"primaryKey":{
"column":"id",
"type":"SNOWFLAKE"
}
},
{
"logicTable": "t_order_item",
"actualDataNodes": "db$->{0..1}.t_order_$->{2023..2025}",
"db": {
"column": "order_id",
"algorithm": "db$->{ order_id % 2 }"
},
"table": {
"column": "year"
},
"primaryKey":{
"column":"id",
"type":"SNOWFLAKE"
}
}
]
}
这里的bindingTables是绑定表,broadcastTables是广播表
根据0\1分库,2023-2025分表
db$->{0..1}.t_order_$->{2023..2025}
分库策略,id取模
"algorithm": "db$->{ id % 2 }"
分表策略,根据year字段判断
"table": {
"column": "year"
},
5、获取shardingJdbc.conf配置
PropertiesConfig.java
@Slf4j
public class PropertiesConfig {
public static String getShardingJdbcConfig() {
InputStream is = PropertiesConfig.class.getClassLoader().getResourceAsStream("shardingJdbc.conf");
try {
byte[] buf = new byte[1024];
int length = 0;
StringBuffer sb = new StringBuffer();
while ((length = is.read(buf)) != -1) {
sb.append(new String(buf, 0, length));
}
is.close();
return sb.toString();
} catch (IOException e) {
log.error("读取分库分表数据异常");
}
throw new NullPointerException();
}
}
6、分片算法
CustomTablePreciseShardingAlgorithm.java
public class CustomTablePreciseShardingAlgorithm implements PreciseShardingAlgorithm<Integer> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Integer> shardingValue) {
String targetName = shardingValue.getLogicTableName();
return targetName + "_" + shardingValue.getValue();
}
}
这是一个自定义的表精确分片算法,实现了PreciseShardingAlgorithm
接口,并指定了泛型为Integer
。
在doSharding
方法中,根据传入的逻辑表名和分片键的值,确定要路由到的具体目标表名。算法的逻辑是将逻辑表名和分片键的值拼接起来作为目标表名。
这个自定义的表精确分片算法可以用于将某个逻辑表的数据根据分片键值路由到对应的物理表。具体的分片策略和目标表名的生成逻辑可以根据实际需求进行自定义实现。
这边拓展一个基于取模算法的精确分片
public class CustomTablePreciseShardingAlgorithm implements PreciseShardingAlgorithm<Integer> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Integer> shardingValue) {
int shardingKey = shardingValue.getValue();
// 目标表的数量
int tableCount = availableTargetNames.size();
// 计算目标表的索引
int targetTableIndex = shardingKey % tableCount;
//根据目标索引选择对应的目标表名
List<String> targetTableNames = new ArrayList<>(availableTargetNames);
String targetTableName = targetTableNames.get(targetTableIndex);
return targetTableName;
}
}
7、id配置
SnowFlakeWordIdConfig
@Configuration
@Slf4j
public class SnowFlakeWordIdConfig {
static {
try {
InetAddress ip4 = Inet4Address.getLocalHost();
String addressIp = ip4.getHostAddress();
String workerId = Math.abs(addressIp.hashCode()) % 1024 + "";
System.setProperty("workerId", workerId);
} catch (UnknownHostException e) {
log.error("workerId错误");
}
}
}
8、数据源
DataSourceConfig
@Slf4j
@Configuration
@EnableTransactionManagement
public class DataSourceConfig {
//从properties中读取的配置
@Value("${db.driver}")
private String driver;
@Value("${db.0.url}")
private String url0;
@Value("${db.0.username}")
private String userName0;
@Value("${db.0.password}")
private String passWord0;
@Value("${db.1.url}")
private String url1;
@Value("${db.1.username}")
private String userName1;
@Value("${db.1.password}")
private String passWord1;
@Value("${sql.showSjSql}")
private String showSjSql;
//逻辑表
private final String LOGIC_TABLE = "logicTable";
//真实节点
private final String ACTUAL_DATA_NODES = "actualDataNodes";
private final String DB = "db";
private final String TABLE = "table";
private final String COLUMN = "column";
private final String ALGORITHM = "algorithm";
private final String PRIMARY_KEY = "primaryKey";
private final String TYPE = "type";
@Bean
public DataSourceTransactionManager transactionManager(
@Qualifier("buildDataSource") DataSource dataSource
) {
//将该数据源与事务管理器关联起来,@Qualifier注解指定要注入的数据源
log.info("加载数据库连接");
return new DataSourceTransactionManager(dataSource);
}
@Bean("sqlSessionFactory")
public SqlSessionFactory sqlSessionFactory() throws Exception {
//创建sqlSession工厂
MybatisSqlSessionFactoryBean sessionFactoryBean = new MybatisSqlSessionFactoryBean();
//获取数据源
sessionFactoryBean.setDataSource(buildDataSource());
//设置获取mapper地址
sessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));
return sessionFactoryBean.getObject();
}
@Bean
public DataSource buildDataSource() throws SQLException {
// 解析Sharding-JDBC配置信息
ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
String shardingJdbcConfig = PropertiesConfig.getShardingJdbcConfig();
JSONObject jsonObject = JSON.parseObject(shardingJdbcConfig);
// 获取绑定表和广播表的配置
JSONArray bindingTables = JSON.parseArray(jsonObject.get("bindingTables").toString());
shardingRuleConfig.setBindingTableGroups(bindingTables.toJavaList(String.class));
JSONArray broadcastTables = JSON.parseArray(jsonObject.get("broadcastTables").toString());
shardingRuleConfig.setBroadcastTables(broadcastTables.toJavaList(String.class));
// 遍历分片算法的配置
JSONArray shardingAlgorithm = JSON.parseArray(jsonObject.get("shardingAlgorithm").toString());
int size = shardingAlgorithm.size();
for (int i = 0; i < size; i++) {
JSONObject jsonObj = JSON.parseObject(shardingAlgorithm.get(i).toString());
String logicTable = jsonObj.getString(LOGIC_TABLE);
String actualDataNodes = jsonObj.getString(ACTUAL_DATA_NODES);
if (StringUtils.isBlank(logicTable)) {
throw new NullPointerException();
}
if (StringUtils.isBlank(actualDataNodes)) {
throw new NullPointerException();
}
// 创建TableRuleConfiguration对象
TableRuleConfiguration tableRule = new TableRuleConfiguration(logicTable, actualDataNodes);
// 设置数据库分片策略配置
if (jsonObj.containsKey(DB)) {
JSONObject db = jsonObj.getJSONObject(DB);
tableRule.setDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration(db.getString(COLUMN), db.getString(ALGORITHM)));
}
// 设置表分片策略配置
if (jsonObj.containsKey(TABLE)) {
JSONObject table = jsonObj.getJSONObject(TABLE);
String tableField = table.getString(COLUMN);
if ("year".equalsIgnoreCase(tableField)) {
tableRule.setTableShardingStrategyConfig(new StandardShardingStrategyConfiguration(tableField, new CustomTablePreciseShardingAlgorithm()));
} else {
tableRule.setTableShardingStrategyConfig(new InlineShardingStrategyConfiguration(tableField, table.getString(ALGORITHM)));
}
}
// 设置键生成器配置
if (jsonObj.containsKey(PRIMARY_KEY)) {
JSONObject pk = jsonObj.getJSONObject(PRIMARY_KEY);
Properties p = new Properties();
p.setProperty("worker.id", System.getProperty("workerId"));
tableRule.setKeyGeneratorConfig(new KeyGeneratorConfiguration(pk.getString(TYPE), pk.getString(COLUMN), p));
}
// 将TableRuleConfiguration对象添加到分片规则配置中
shardingRuleConfig.getTableRuleConfigs().add(tableRule);
}
// 打印sql
Properties p = new Properties();
p.setProperty("sql.show", showSjSql);
// 使用数据源映射和分片规则配置创建分片数据源
DataSource dataSource = ShardingDataSourceFactory.createDataSource(
createDataSourceMap(),
shardingRuleConfig,
p
);
return dataSource;
}
private Map<String, DataSource> createDataSourceMap() {
Map<String, DataSource> result = new LinkedHashMap<>(2);
result.put("db0", ds0());
result.put("db1", ds1());
return result;
}
public DataSource ds0() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(url0);
dataSource.setUsername(userName0);
dataSource.setPassword(passWord0);
dataSource.setDriverClassName(driver);
return dataSource;
}
public DataSource ds1() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(url1);
dataSource.setUsername(userName1);
dataSource.setPassword(passWord1);
dataSource.setDriverClassName(driver);
return dataSource;
}
}
注解的理解:
Spring事务原理分析--@EnableTransactionManagement
9、测试
@Slf4j
@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
OrderMapper orderMapper;
@Autowired
OrderItemMapper orderItemMapper;
@Autowired
ConfigMapper configMapper;
@GetMapping("add")
public void add() {
log.info("1111111");
for (int i = 1; i < 1000; i++) {
OrderDO order = new OrderDO();
order.setId(IdUtil.generateSnowFlakeId());
order.setNo("000" + i);
order.setCreateName("订单" + i);
order.setPrice(new BigDecimal(1));
order.setYear("2023");
orderMapper.insert(order);
OrderItemDO orderItem = new OrderItemDO();
orderItem.setId(IdUtil.generateSnowFlakeId());
orderItem.setOrderNo(order.getNo());
orderItem.setName("订单详情" + i);
orderItem.setPrice(new BigDecimal(1));
orderItem.setYear(order.getYear());
orderItemMapper.insert(orderItem);
}
}
@GetMapping("addB")
public void addB(){
for (int i = 1; i < 1000; i++) {
ConfigDO configDO = new ConfigDO();
configDO.setId(IdUtil.generateSnowFlakeId());
configDO.setLastModifyTime(new Date());
configDO.setCreateTime(new Date());
configDO.setRemark("1111");
configMapper.insert(configDO);
}
}
}