文章目录
- 介绍
- 使用Zookeeper进行集群部署
- 统一ShardingJDBC和ShardingProxy配置
- 通过Zookeeper注册中心同步配置
- 直接使用ShardingProxy提供的JDBC驱动读取配置文件
介绍
开发者手册
在conf/server.yaml配置文件中有下面这一段配置,就是关于集群部署的
mode:
# type: standalone
type: Cluster
repository:
type: ZooKeeper
props:
namespace: governance_ds
server-lists: localhost:2181
retryIntervalMilliseconds: 500
timeToLiveSeconds: 60
maxRetries: 3
operationTimeoutMilliseconds: 500
ShardingSphere支持两种运行模式,Standalone独立模式和Cluster集群模式。
在Standalone独立模式下,ShardingSphere不需要考虑其他实例的影响,直接在内存中管理核心配置规则就可以了。如果把整个mode都注释掉,他就是ShardingSphere默认的运行模式。
而在Cluster集群模式下,ShardingSphere不光要考虑自己的配置规则,还需要考虑如何跟集群中的其他实例同步自己的配置规则。这就需要引入第三方组件来提供配置信息同步。ShardingSphere目前支持的配置中心包括:Zookeeper、etcd、Nacos、Consule。
但是在ShardingSphere分库分表的场景下,这些配置信息几乎不会变动,访问频率也不会太高。所以,最为推荐的,是基于CP架构的Zookeeper。
另外,如果应用的本地和Zookeeper中都有配置信息,那么ShardingSphere会以Zookeeper中的配置为准。
使用Zookeeper进行集群部署
接下来我们可以基于Zookeeper部署一下ShardingProxy集群,看一下ShardingSphere需要同步的配置有哪些。
我们只需要在本地部署一个Zookeeper,然后将server.yaml中的mode部分解除注释:
mode:
type: Cluster
repository:
type: ZooKeeper
props:
namespace: governance_ds
server-lists: localhost:2181
retryIntervalMilliseconds: 500
timeToLiveSeconds: 60
maxRetries: 3
operationTimeoutMilliseconds: 500
启动ShardingProxy服务后,可以看到Zookeeper注册中心的信息如下是:
namespace
├──rules # 全局规则配置
├──props # 属性配置
├──metadata # Metadata 配置
├ ├──${databaseName} # 逻辑数据库名称
├ ├ ├──schemas # Schema 列表
├ ├ ├ ├──${schemaName} # 逻辑 Schema 名称
├ ├ ├ ├ ├──tables # 表结构配置
├ ├ ├ ├ ├ ├──${tableName}
├ ├ ├ ├ ├ ├──...
├ ├ ├ ├──...
├ ├ ├──versions # 元数据版本列表
├ ├ ├ ├ ├──views # 视图结构配置
├ ├ ├ ├ ├ ├──${viewName}
├ ├ ├ ├ ├ ├──...
├ ├ ├ ├──${versionNumber} # 元数据版本号
├ ├ ├ ├ ├──dataSources # 数据源配置
├ ├ ├ ├ ├──rules # 规则配置
├ ├ ├ ├──...
├ ├ ├──active_version # 激活的元数据版本号
├ ├──...
├──nodes
├ ├──compute_nodes
├ ├ ├──online
├ ├ ├ ├──proxy
├ ├ ├ ├ ├──UUID # Proxy 实例唯一标识
├ ├ ├ ├ ├──....
├ ├ ├ ├──jdbc
├ ├ ├ ├ ├──UUID # JDBC 实例唯一标识
├ ├ ├ ├ ├──....
├ ├ ├──status
├ ├ ├ ├──UUID
├ ├ ├ ├──....
├ ├ ├──worker_id
├ ├ ├ ├──UUID
├ ├ ├ ├──....
├ ├ ├──process_trigger
├ ├ ├ ├──process_list_id:UUID
├ ├ ├ ├──....
├ ├ ├──labels
├ ├ ├ ├──UUID
├ ├ ├ ├──....
├ ├──storage_nodes
├ ├ ├──${databaseName.groupName.ds}
├ ├ ├──${databaseName.groupName.ds}
而在rules部分,就是我们配置的ShardingProxy的核心属性
- !AUTHORITY
provider:
type: ALL_PERMITTED
users:
- root@%:root
- sharding@%:sharding
- !TRANSACTION
defaultType: XA
providerType: Atomikos
- !SQL_PARSER
parseTreeCache:
initialCapacity: 128
maximumSize: 1024
sqlCommentParseEnabled: true
sqlStatementCache:
initialCapacity: 2000
maximumSize: 65535
而分库分表的信息,则配置在/governance_ds/metadata/sharding_db/versions/0/rules节点下
- !SHARDING
tables:
# 逻辑表
sys_user:
actualDataNodes: ds_${0..1}.sys_user${1..2}
# 分表策略
tableStrategy:
standard:
shardingColumn: uid
shardingAlgorithmName: sys_user_tab_alg
# 分布式主键生成策略
keyGenerateStrategy:
column: uid
keyGeneratorName: alg_snowflake
# 默认分库策略
defaultDatabaseStrategy:
standard:
shardingColumn: uid
shardingAlgorithmName: database_inline
# 默认分表策略
defaultTableStrategy:
none:
# 分片策略
shardingAlgorithms:
database_inline:
type: INLINE
props:
algorithm-expression: ds_${uid % 2}
sys_user_tab_alg:
type: INLINE
props:
algorithm-expression: sys_user$->{((uid+1)%4).intdiv(2)+1}
# 分布式主键生成策略
keyGenerators:
alg_snowflake:
type: COSID_SNOWFLAKE
统一ShardingJDBC和ShardingProxy配置
既然ShardingProxy可以通过Zookeeper同步配置信息,那么我们可不可以在ShardingJDBC中也采用Zookeeper的配置呢?当然是可以的。
通过Zookeeper注册中心同步配置
第一种简单的思路就是将ShardingProxy中的mod部分配置移植到之前的ShardingJDBC示例中。
将application.properties中的配置信息全部删除,只配置Zookeeper地址:
# 微服务中配置信息如下,如果使用yml配置文件方式就不需要spring.shardingsphere这个前缀,当然使用yml加上这个前缀也能正常运行,只是需要了解这一点
# 如果使用properties就需要再上方配置的基础上加上spring.shardingsphere前缀
spring.shardingsphere.mode.type=Cluster
spring.shardingsphere.mode.repository.type=ZooKeeper
spring.shardingsphere.mode.repository.props.namespace=governance_ds
spring.shardingsphere.mode.repository.props.server-lists=localhost:2181
spring.shardingsphere.mode.repository.props.retryIntervalMilliseconds=600
spring.shardingsphere.mode.repository.props.timeToLiveSecoonds=60
spring.shardingsphere.mode.repository.props.maxRetries=3
spring.shardingsphere.mode.repository.props.operationTimeoutMilliseconds=500
# 指定读取Zookeeper上的哪一个库。默认值是logic_db
# ShardingProxy的配置文件中默认配置的库是sharding_db
# 如果有多个数据库需要读取,用逗号隔开 spring.shardingsphere.database.name=sharding_hs_db,logic_db,sharding_db
spring.shardingsphere.database.name=sharding_hs_db
这里需要注意,如果是使用ShardingJDBC的方式,那么默认是会读取一个logic_db数据库,而ShardingProxy的配置中,默认的数据库名是sharding_db,就会造成微服务端在进行查询更新操作时没有想过的分片策略使用
org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory#create()
public static MetaDataContexts create(...) throws SQLException {
// instanceContext.getInstance().getMetaData() 这个方法判断是不是JDBCInstanceMetaData
// 我们使用ShardingJDBC就是会创建JDBCInstanceMetaData类型
Collection<String> databaseNames = instanceContext.getInstance().getMetaData() instanceof JDBCInstanceMetaData
// 是JDBC类型就相当于读取本地的配置 ,当前我们配置文件中只有zookeeper连接的配置信息,所以最终读取到一个默认值logic_db
? parameter.getDatabaseConfigs().keySet()
// 而ShardingProxy就相当于是去读取Zookeeper中metadata下的所有数据库
: persistService.getDatabaseMetaDataService().loadAllDatabaseNames();
......
}
上方中instanceContext.getInstance().getMetaData()
值的来源是ShardingSphereDataSource
类中
// org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource类
// 构造方法会调用下面这个方法
private ContextManager createContextManager(...) throws SQLException {
// 创建JDBCInstanceMetaData
InstanceMetaData instanceMetaData = InstanceMetaDataBuilderFactory.create("JDBC", -1);
......
}
// 进入到create()方法 下面使用SPI机制加载InstanceMetaDataBuilder接口的实现类
// 而InstanceMetaDataBuilder接口的实现类就只有 JDBCInstanceMetaDataBuilder 和 ProxyInstanceMetaDataBuilder
// 对应的就是SharingJDBC和ShardingProxy两种方式
public static InstanceMetaData create(String type, int port) {
return ((InstanceMetaDataBuilder)TypedSPIRegistry.getRegisteredService(InstanceMetaDataBuilder.class, type)).build(port);
}
上方中parameter.getDatabaseConfigs().keySet()
会读取到一个默认值logic_db
对应的源码在DatabaseNameSetter
类中
package org.apache.shardingsphere.spring.boot.schema;
public final class DatabaseNameSetter {
private static final String DATABASE_NAME_KEY = "spring.shardingsphere.database.name";
private static final String SCHEMA_NAME_KEY = "spring.shardingsphere.schema.name";
/**
* Get database name.
*
* @param environment spring boot environment
* @return schema name
*/
public static String getDatabaseName(final Environment environment) {
StandardEnvironment standardEnv = (StandardEnvironment) environment;
// 先读取spring.shardingsphere.database.name配置项的值
String databaseName = standardEnv.getProperty(DATABASE_NAME_KEY);
if (!Strings.isNullOrEmpty(databaseName)) {
return databaseName;
}
// 再去读取 spring.shardingsphere.schema.name 配置项的值
String schemaName = standardEnv.getProperty(SCHEMA_NAME_KEY);
// DefaultDatabase.LOGIC_NAME 默认值是 logic_db
return Strings.isNullOrEmpty(schemaName) ? DefaultDatabase.LOGIC_NAME : schemaName;
}
}
直接使用ShardingProxy提供的JDBC驱动读取配置文件
ShardingSphere还提供了自己的JDBC驱动
在我们的微服务中 classpath下增加一个config.yaml,然后将我们之前在ShardingProxy中的几个关键配置整合到一起
databaseName: sharding_hs_db
dataSources:
ds_0:
url: jdbc:mysql://localhost:3306/sharding_sphere1?serverTimezone=UTC&useSSL=false
username: root
password: 1234
connectionTimeoutMilliseconds: 30000
idleTimeoutMilliseconds: 60000
maxLifetimeMilliseconds: 1800000
maxPoolSize: 50
minPoolSize: 1
ds_1:
url: jdbc:mysql://localhost:3306/sharding_sphere2?serverTimezone=UTC&useSSL=false
username: root
password: 1234
connectionTimeoutMilliseconds: 30000
idleTimeoutMilliseconds: 60000
maxLifetimeMilliseconds: 1800000
maxPoolSize: 50
minPoolSize: 1
rules:
- !SHARDING
tables:
# 逻辑表
sys_user:
actualDataNodes: ds_${0..1}.sys_user${1..2}
# 分表策略
tableStrategy:
standard:
shardingColumn: uid
shardingAlgorithmName: sys_user_tab_alg
# 分布式主键生成策略
keyGenerateStrategy:
column: uid
keyGeneratorName: alg_snowflake
# 默认分库策略
defaultDatabaseStrategy:
standard:
shardingColumn: uid
shardingAlgorithmName: database_inline
# 默认分表策略
defaultTableStrategy:
none:
# 分片策略
shardingAlgorithms:
database_inline:
type: INLINE
props:
algorithm-expression: ds_${uid % 2}
sys_user_tab_alg:
type: INLINE
props:
algorithm-expression: sys_user$->{((uid+1)%4).intdiv(2)+1}
# 分布式主键生成策略
keyGenerators:
alg_snowflake:
type: COSID_SNOWFLAKE
# 注意,下方rules需要注释掉
rules:
- !AUTHORITY
users:
- root@%:root
- sharding@:sharding
provider:
type: ALL_PERMITTED
- !TRANSACTION
defaultType: XA
providerType: Atomikos
- !SQL_PARSER
sqlCommentParseEnabled: true
sqlStatementCache:
initialCapacity: 2000
maximumSize: 65535
parseTreeCache:
initialCapacity: 128
maximumSize: 1024
props:
max-connections-size-per-query: 1
kernel-executor-size: 16 # Infinite by default.
proxy-frontend-flush-threshold: 128 # The default value is 128.
proxy-hint-enabled: false
sql-show: false
check-table-metadata-enabled: false
# Proxy backend query fetch size. A larger value may increase the memory usage of ShardingSphere Proxy.
# The default value is -1, which means set the minimum value for different JDBC drivers.
proxy-backend-query-fetch-size: -1
proxy-frontend-executor-size: 0 # Proxy frontend executor size. The default value is 0, which means let Netty decide.
# Available options of proxy backend executor suitable: OLAP(default), OLTP. The OLTP option may reduce time cost of writing packets to client, but it may increase the latency of SQL execution
# and block other clients if client connections are more than `proxy-frontend-executor-size`, especially executing slow SQL.
proxy-backend-executor-suitable: OLAP
proxy-frontend-max-connections: 0 # Less than or equal to 0 means no limitation.
# Available sql federation type: NONE (default), ORIGINAL, ADVANCED
sql-federation-type: NONE
# Available proxy backend driver type: JDBC (default), ExperimentalVertx
proxy-backend-driver-type: JDBC
proxy-mysql-default-version: 8.0.15 # In the absence of schema name, the default version will be used.
proxy-default-port: 3307 # Proxy default port.
proxy-netty-backlog: 1024 # Proxy netty backlog.
然后,可以直接用JDBC的方式访问带有分库分表的虚拟库。
public class ShardingJDBCDriverTest {
@Test
public void test() throws ClassNotFoundException, SQLException {
String jdbcDriver = "org.apache.shardingsphere.driver.ShardingSphereDriver";
String jdbcUrl = "jdbc:shardingsphere:classpath:config.yaml";
String sql = "select * from sharding_hs_db.sys_user";
Class.forName(jdbcDriver);
try(Connection connection = DriverManager.getConnection(jdbcUrl);) {
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()){
System.out.println("uid= "+resultSet.getLong("uid"));
}
}
}
}
启动时,发现报错了
Caused by: java.lang.IllegalStateException: dataSourceClassName can not be null.
看样子直接把配置从ShardingProxy中复制过来有点小问题,那边是不需要加的。
所以需要再现有数据源配置上加上dataSourceClassName
的配置,我这里先是使用com.mysql.cj.jdbc.Driver
结果报错了
java.lang.ClassCastException: com.mysql.cj.jdbc.Driver cannot be cast to javax.sql.DataSource
再通过查看开发手册,修改成了com.zaxxer.hikari.HikariDataSource
此时又报了新的错误
java.lang.NullPointerException: Can not find transaction manager of `XA`
接下来再解决XA事务管理器相关的问题,因为ShardingProxy默认 XA事务管理器 使用的是 Atomikos ,我们上方config.yaml配置文件中也是这个配置。所以我接下里导入相关依赖
<!--XA 分布式事务 -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-transaction-xa-core</artifactId>
<!-- 因为文本使用的是ShardingSphere 5.2.1这里和总版本对应上 -->
<version>5.2.1</version>
</dependency>
又报错了
Failed to instantiate [javax.sql.DataSource]: Factory method 'shardingSphereDataSource' threw exception; nested exception is java.lang.AbstractMethodError: com.atomikos.icatch.jta.JtaTransactionServicePlugin.beforeInit()V
从报错信息可以看出来是Atomikos源码包中有问题,接下来在进行解决
最终导入的依赖如下
<!--XA 分布式事务 -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-transaction-xa-core</artifactId>
<version>5.2.1</version>
<exclusions>
<exclusion>
<artifactId>transactions-jdbc</artifactId>
<groupId>com.atomikos</groupId>
</exclusion>
<exclusion>
<artifactId>transactions-jta</artifactId>
<groupId>com.atomikos</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- 版本滞后了 -->
<dependency>
<artifactId>transactions-jdbc</artifactId>
<groupId>com.atomikos</groupId>
<version>5.0.8</version>
</dependency>
<dependency>
<artifactId>transactions-jta</artifactId>
<groupId>com.atomikos</groupId>
<version>5.0.8</version>
</dependency>
终于执行通过
下方这种测试方法也可以
public class ShardingSphereDatasourceTest {
public static void main(String[] args) throws SQLException, ClassNotFoundException {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setDriverClassName("org.apache.shardingsphere.driver.ShardingSphereDriver");
dataSource.setJdbcUrl("jdbc:shardingsphere:classpath:config.yaml");
// Class.forName("org.apache.shardingsphere.driver.ShardingSphereDriver");
// String jdbcUrl = "jdbc:shardingsphere:classpath:config.yaml";
// Connection conn = DriverManager.getConnection(jdbcUrl);
Connection conn = dataSource.getConnection();
String sql = "SELECT cid,cname,user_id,cstatus from course where cid=851198093910081536";
try {
//ShardingConnectioin
conn = dataSource.getConnection();
//ShardingStatement
Statement statement = conn.createStatement();
//ShardingResultSet
ResultSet result = statement.executeQuery(sql);
while (result.next()) {
System.out.println("result:" + result);
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (null != conn) {
conn.close();
}
}
}
}