Mysql分布式事务,前提条件
MySQL中只有当隔离级别设置为Serializable的时候才能使用分布式事务。
执行两个命令确认环境
show variables like 'innodb_support_xa';
show variables like '%tx_iso%';
异常信息
根据官方分布式示例
public function test()
{
Db::transactionXa(function () {
$updateTime = time();
Db::connect('cdpf_driver')->table('cdpf_driver_user_login_his')->where(["id" => 3128])->update(["login_time" => $updateTime]);
Db::connect('cheduo')->table('users')->where(["id" => 2])->update(["updated_at" => date("Y-m-d H:i:s", $updateTime)]);
}, [Db::connect('cdpf_driver'),Db::connect('cheduo')]);
}
如果是同一个服务,执行会出现
异常提示:
SQLSTATE[XAE08]: <>: 1440 XAER_DUPID: The XID already exists
解决办法:
方案一:单独编写,不要用thinkphp封装好的
public function test()
{
$xid = uniqid('xa');
$updateTime = time();
// 第一个数据库连接
$cdpfDriverXid = "cdpf_driver_{$xid}";
$cdpfDriverDb = Db::connect('cdpf_driver');
$cdpfDriverConnect = $cdpfDriverDb->connect();
// 第二个数据库连接
$cheduoXid = "cheduo_{$xid}";
$cheduoDb = Db::connect('cheduo');
$cheduoConnect = $cheduoDb->connect();
try {
// 第一个事务开始
$cdpfDriverConnect->exec("XA begin '$cdpfDriverXid'");
$cdpfDriverDb->table('cdpf_driver_user_login_his')->where(["id" => 3128])->update(["login_time" => $updateTime]);
$cdpfDriverConnect->exec("XA END '$cdpfDriverXid'");
// 到这里挂起
// 第二个事务开始
$cheduoConnect->exec("XA begin '$cheduoXid'");
$cheduoDb->table('users')->where(["id" => 2])->update(["updated_at" => date("Y-m-d H:i:s", $updateTime)]);
$cheduoConnect->exec("XA END '$cheduoXid'");
// 到这里挂起
// 预执行与提交
$cdpfDriverConnect->exec("XA PREPARE '$cdpfDriverXid'");
$cdpfDriverConnect->exec("XA COMMIT '$cdpfDriverXid'");
$cheduoConnect->exec("XA PREPARE '$cheduoXid'");
$cheduoConnect->exec("XA COMMIT '$cheduoXid'");
} catch (\Exception $ex) {
echo $ex->getMessage();
// 注意:其实如果语句本身存在问题,并没有走到 XA END。这里回滚会抛出异常
$cdpfDriverConnect->exec("XA ROLLBACK '$cdpfDriverXid'");
$cheduoConnect->exec("XA ROLLBACK '$cheduoXid'");
}
return "success";
}
方案二,改动源码
namespace think\db;
/**
* 数据库连接基础类
* @property PDO[] $links
* @property PDO $linkID
* @property PDO $linkRead
* @property PDO $linkWrite
*/
abstract class PDOConnection extends Connection
{
/**
* 执行数据库Xa事务
* @access public
* @param callable $callback 数据操作方法回调
* @param array $dbs 多个查询对象或者连接对象
* @return mixed
* @throws PDOException
* @throws \Exception
* @throws \Throwable
*/
public function transactionXa(callable $callback, array $dbs = [])
{
$xid = uniqid('xa');
if (empty($dbs)) {
$dbs[] = $this;
}
foreach ($dbs as $key => $db) {
if ($db instanceof BaseQuery) {
$db = $db->getConnection();
$dbs[$key] = $db;
}
$db->startTransXa($db->getConfig('hostname').'_'.$db->getConfig('database').'_'.$xid);
}
try {
$result = null;
if (is_callable($callback)) {
$result = $callback($this);
}
foreach ($dbs as $db) {
$db->prepareXa($db->getConfig('hostname').'_'.$db->getConfig('database').'_'.$xid);
}
foreach ($dbs as $db) {
$db->commitXa($db->getConfig('hostname').'_'.$db->getConfig('database').'_'.$xid);
}
return $result;
} catch (\Exception | \Throwable $e) {
foreach ($dbs as $db) {
$db->rollbackXa($db->getConfig('hostname').'_'.$db->getConfig('database').'_'.$xid);
}
throw $e;
}
}
}
下面来分析原因:
其实从错误提醒这里很好得知
分布式唯一ID已经存在了
跟踪源码
namespace think\db;
/**
* 数据库连接基础类
* @property PDO[] $links
* @property PDO $linkID
* @property PDO $linkRead
* @property PDO $linkWrite
*/
abstract class PDOConnection extends Connection
{
/**
* 执行数据库Xa事务
* @access public
* @param callable $callback 数据操作方法回调
* @param array $dbs 多个查询对象或者连接对象
* @return mixed
* @throws PDOException
* @throws \Exception
* @throws \Throwable
*/
public function transactionXa(callable $callback, array $dbs = [])
{
$xid = uniqid('xa');
if (empty($dbs)) {
$dbs[] = $this;
}
foreach ($dbs as $key => $db) {
if ($db instanceof BaseQuery) {
$db = $db->getConnection();
$dbs[$key] = $db;
}
// 问题就在这里,同一个数据库服务器,一个事务ID还没执行完毕
// 又循环生成同样的事务ID,导致异常
$db->startTransXa($xid);
}
try {
$result = null;
if (is_callable($callback)) {
$result = $callback($this);
}
foreach ($dbs as $db) {
$db->prepareXa($xid);
}
foreach ($dbs as $db) {
$db->commitXa($xid);
}
return $result;
} catch (\Exception | \Throwable $e) {
foreach ($dbs as $db) {
$db->rollbackXa($xid);
}
throw $e;
}
}
}