Seata整合nacos,Postgresql 为DB存储
环境
详情环境可参考
https://github.com/alibaba/spring-cloud-alibaba/wiki/%E7%89%88%E6%9C%AC%E8%AF%B4%E6%98%8E
我这里
<spring.cloud.alibaba-version>2021.1</spring.cloud.alibaba-version>
所有选择seata版本为 1.3.0
docker-compose 部署seata
前提:已经安装好nacos和postgresql
- 创建好数据库 名称:seata
执行以下sql:
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS public.global_table
(
xid VARCHAR(128) NOT NULL,
transaction_id BIGINT,
status SMALLINT NOT NULL,
application_id VARCHAR(32),
transaction_service_group VARCHAR(32),
transaction_name VARCHAR(128),
timeout INT,
begin_time BIGINT,
application_data VARCHAR(2000),
gmt_create TIMESTAMP(0),
gmt_modified TIMESTAMP(0),
CONSTRAINT pk_global_table PRIMARY KEY (xid)
);
CREATE INDEX idx_status_gmt_modified ON public.global_table (status, gmt_modified);
CREATE INDEX idx_transaction_id ON public.global_table (transaction_id);
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS public.branch_table
(
branch_id BIGINT NOT NULL,
xid VARCHAR(128) NOT NULL,
transaction_id BIGINT,
resource_group_id VARCHAR(32),
resource_id VARCHAR(256),
branch_type VARCHAR(8),
status SMALLINT,
client_id VARCHAR(64),
application_data VARCHAR(2000),
gmt_create TIMESTAMP(6),
gmt_modified TIMESTAMP(6),
CONSTRAINT pk_branch_table PRIMARY KEY (branch_id)
);
CREATE INDEX idx_xid ON public.branch_table (xid);
-- the table to store lock data
CREATE TABLE IF NOT EXISTS public.lock_table
(
row_key VARCHAR(128) NOT NULL,
xid VARCHAR(128),
transaction_id BIGINT,
branch_id BIGINT NOT NULL,
resource_id VARCHAR(256),
table_name VARCHAR(32),
pk VARCHAR(36),
status SMALLINT NOT NULL DEFAULT 0,
gmt_create TIMESTAMP(0),
gmt_modified TIMESTAMP(0),
CONSTRAINT pk_lock_table PRIMARY KEY (row_key)
);
comment on column public.lock_table.status is '0:locked ,1:rollbacking';
CREATE INDEX idx_branch_id ON public.lock_table (branch_id);
CREATE INDEX idx_xid ON public.lock_table (xid);
CREATE INDEX idx_status ON public.lock_table (status);
CREATE TABLE distributed_lock (
lock_key VARCHAR(20) NOT NULL,
lock_value VARCHAR(20) NOT NULL,
expire BIGINT NOT NULL,
CONSTRAINT pk_distributed_lock_table PRIMARY KEY (lock_key)
);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
其他的sql可以参考:
https://github.com/seata/seata/tree/develop/script/server/db
服务器层级结构:
2.先运行一个seata容器,目的为了把seata相关的配置文件拷贝处理,方便后续使用
docker run --name seata-server -p 8091:8091 -d seataio/seata-server:1.3.0
docker cp seata-server:/seata-server /opt/seata/resources
3.然后将该容器删除
docker stop seata-server
docker rm seata-server
4, 修改后的file.conf
## transaction log store, only used in seata-server
store {
## store mode: file、db、redis
mode = "db"
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
## datasource = "druid"
## mysql/oracle/postgresql/h2/oceanbase etc.
dbType = "postgresql"
driverClassName = "org.postgresql.Driver"
url = "jdbc:postgresql://101.35.249.216:5432/seata"
user = "root"
password = "root"
minConn = 5
maxConn = 30
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
maxWait = 5000
}
## redis store property
redis {
host = "127.0.0.1"
port = "6379"
password = ""
database = "0"
minConn = 1
maxConn = 10
queryLimit = 100
}
}
修改细节如下图:
修改后的registry.conf:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "nacos.carbonease.cn:80"
group = "SEATA_GROUP"
namespace = "cfa514ed-1caf-4d2b-8645-8dbee0f3933f"
cluster = "default"
username = "nacos"
password = "Carbonease@2022"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = 0
password = ""
cluster = "default"
timeout = 0
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "nacos.carbonease.cn:80"
namespace = "cfa514ed-1caf-4d2b-8645-8dbee0f3933f"
group = "SEATA_GROUP"
username = "nacos"
password = "Carbonease@2022"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
appId = "seata-server"
apolloMeta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
修改细节如下图:
/opt/seata目录下创建 config.txt
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
##将此处的file修改为db
store.mode=db
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
##配置数据
store.db.datasource=druid
store.db.dbType=postgresql
store.db.driverClassName=org.postgresql.Driver
store.db.url=jdbc:postgresql://101.35.249.216:5432/seata
store.db.user=root
store.db.password=root
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.host=127.0.0.1
store.redis.port=6379
store.redis.maxConn=10
store.redis.minConn=1
store.redis.database=0
store.redis.password=null
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
service.vgroup-mapping.sub-tx-group=default
service.vgroup-mapping.admin-tx-group=
在 /opt/seata 下创建 scrip目录
在scrip目录下 创建脚本 nacos-config.sh
nacos-config.sh内容如下:
#!/bin/sh
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at、
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
while getopts ":h:p:g:t:u:w:" opt
do
case $opt in
h)
host=$OPTARG
;;
p)
port=$OPTARG
;;
g)
group=$OPTARG
;;
t)
tenant=$OPTARG
;;
u)
username=$OPTARG
;;
w)
password=$OPTARG
;;
?)
echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
exit 1
;;
esac
done
if [ -z ${host} ]; then
host=localhost
fi
if [ -z ${port} ]; then
port=8848
fi
if [ -z ${group} ]; then
group="SEATA_GROUP"
fi
if [ -z ${tenant} ]; then
tenant=""
fi
if [ -z ${username} ]; then
username=""
fi
if [ -z ${password} ]; then
password=""
fi
nacosAddr=$host:$port
contentType="content-type:application/json;charset=UTF-8"
echo "set nacosAddr=$nacosAddr"
echo "set group=$group"
urlencode() {
length="${#1}"
i=0
while [ $length -gt $i ]; do
char="${1:$i:1}"
case $char in
[a-zA-Z0-9.~_-]) printf $char ;;
*) printf '%%%02X' "'$char" ;;
esac
i=`expr $i + 1`
done
}
failCount=0
tempLog=$(mktemp -u)
function addConfig() {
dataId=`urlencode $1`
content=`urlencode $2`
curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$dataId&group=$group&content=$content&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
if [ -z $(cat "${tempLog}") ]; then
echo " Please check the cluster status. "
exit 1
fi
if [ "$(cat "${tempLog}")" == "true" ]; then
echo "Set $1=$2 successfully "
else
echo "Set $1=$2 failure "
failCount=`expr $failCount + 1`
fi
}
count=0
for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
count=`expr $count + 1`
key=${line%%=*}
value=${line#*=}
addConfig "${key}" "${value}"
done
echo "========================================================================="
echo " Complete initialization parameters, total-count:$count , failure-count:$failCount "
echo "========================================================================="
if [ ${failCount} -eq 0 ]; then
echo " Init nacos config finished, please start seata-server. "
else
echo " init nacos config fail. "
fi
修改sh文件的权限
chmod +x nacos-config.sh
将配置文件推送至Nacos:
bash ./nacos-config.sh -h nacos.carbonease.cn -p 80 -g SEATA_GROUP -t cfa514ed-1caf-4d2b-8645-8dbee0f3933f
命令格式:
sh nacos-config.sh -h Nacos地址 -p nacos端口号 -g SEATA_GROUP(此处是配置文件的分组名称,可以不改) -t 在nacos配置的seata命名空间ID
推送成功
创建 docker-compose.yml 文件
version: "3"
services:
seata-server:
image: seataio/seata-server:1.3.0
container_name: seata-server
hostname: seata-server
ports:
- "8091:8091"
environment:
- SEATA_PORT=8091
- STORE_MODE=db
- SEATA_IP=101.35.249.216 ##将此处的IP替换为docker宿主机的IP
restart: 'no'
volumes:
- "/usr/share/zoneinfo/Asia/Shanghai:/etc/localtime" #设置系统时区
- "/usr/share/zoneinfo/Asia/Shanghai:/etc/timezone" #设置时区
- "./resources:/seata-server/resources"
启动
docker-compose up -d
成功注册上nacos!
springCloud项目整合:
pom文件:
<!-- seata分布式事务-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>
客户端和服务端的版本一定要一致,否则可能出现
no available service 'default' found, please make sure registry config correct
yml文件中:
seata:
application-id: ${spring.application.name}
enabled: true
tx-service-group: my_test_tx_group #此处的配置来源于上述步骤中config.txt中的service.vgroupMapping.my_test_tx_group=default
registry:
type: nacos
nacos:
server-addr: nacos.carbonease.cn:80 #nacos的连接地址
namespace: cfa514ed-1caf-4d2b-8645-8dbee0f3933f #在nacos中创建的seata命名空间ID
group: SEATA_GROUP #seata配置的分组名称
cluster: default
username: nacos
password: Carbonease@2022
config:
type: nacos
nacos:
server-addr: nacos.carbonease.cn:80 #nacos的连接地址
namespace: cfa514ed-1caf-4d2b-8645-8dbee0f3933f #在nacos中创建的seata命名空间ID
group: SEATA_GROUP #seata配置的分组名称
username: nacos
password: Carbonease@2022
service:
vgroup-mapping:
my_test_tx_group: default #此处的配置来源于上述步骤中config.txt中的service.vgroupMapping.my_test_tx_group=default
#注意:此处的my_test_tx_group需要和上面seata.tx-service-group以及config.txt中的配置对应
jdk8以上的需要设置启动参数:
--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED
否则可能出现:
Unable to make protected final java.lang.Class java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) throws java.lang.ClassFormatError accessible: module java.base does not "opens java.lang" to unnamed module @9d5509a
然后,服务所涉及到的数据库需要加上日志表:
-- 日志表
CREATE TABLE IF NOT EXISTS public.undo_log
(
id BIGINT NOT NULL,
branch_id BIGINT NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info bytea NOT NULL,
log_status INT NOT NULL,
log_created TIMESTAMP(6) NOT NULL,
log_modified TIMESTAMP(6) NOT NULL,
CONSTRAINT pk_undo_log PRIMARY KEY (id)
);
CREATE SEQUENCE undo_log_id_seq START 1;
最后:
服务A 调用 服务B
在服务A的Service上加注解
@GlobalTransactional(rollbackFor = Exception.class)
服务B是Service上加注解
@Transactional(rollbackFor = Exception.class)
大功告成!!