target:离开柬埔寨倒计时-218day
珍藏的图片又拿出来了
前言
M系统中的撮合引擎是最最核心的功能,第一版的撮合引擎不是我写的,也没有做交易对的动态分配这样的功能,都是基于抢锁方式来决定谁拥有该交易对的撮合权限,所以锁就至关重要了,本来最简单的方法就是只起一个java进程,然后用jdk的锁就不用担心这些问题了,但是当交易对多的时候,一个进程就不一定能及时的处理这些订单,所以还是需要多台机器同步进行处理,所以还是需要分布式锁。
我接触到的最初版本
我初次接触这个系统是在2019年初
记得是在那年4月还是5月的时候,发生了一个异常,同一个订单撮合了两次,本来那个订单在第一次撮合后就已经全部成交了,所以紧跟着就来了第二笔撮合,那时的负责人让我协助排查这个问题,我就一脸懵的开始了排查之路
- 首先我快速熟悉这套交易流程,让负责人给我讲解;
- 根据交易流程,发现问题出现的原因一定在撮合引擎上面;
- 查看撮合引擎的日志
当时撮合引擎的线程名称是撮合引擎前缀+交易对+编号,排查日志很容易发现其中有两个线程名称和相似,只有编号不一样,交易对是一样的,这就意味着同一个交易对有两个线程在进行撮合,因为这两个线程处于不同的jvm进程内,所以就没办法共享订单簿内存,这样就会出现撮合多次的情况了。
看到这里我不禁心想,这不是锁住了吗,怎么会还出现同一个交易对被两个线程都撮合的情况呢,除非这个锁没有锁住,我先是去查看了加锁的逻辑,加锁使用的是redisson,加锁的key是交易对,所以从逻辑上看是没什么问题的;然后我就继续排查日志,我看到第二台服务器的那个线程产生撮合日志的时间就在几个小时前,属于我就着重去找了那段时间的日志;
从里面的日志我看到了一条很有嫌疑的日志,不能更改锁的过期时间,这时候我隐约知道问题出现的原因了
先来看一段redisson锁里面的一段关键代码片段
类:org.redisson.RedissonLock
// 这个其实就是给redisson锁保活的一个续命任务
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
// 就是这里,如果这里发生了异常,就不会执行下面的对自己的调用
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
// 其实当时key是存在的,只是发生了网络问题,所以没有到这个分支
if (res) {
// reschedule itself
// 每次续命成功才会继续发起下一次的续命
renewExpiration();
}
});
}
// 这里续命时间默认是锁超时时间的1/3,也就是说默认30s的话,会每10s发起一次续命
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
其实这个续命任务在多数场景下都是足以支持的了,像我遇到的这个场景是比较少见的,当然也可以增大锁的超时时间,但是多长的时间能满足呢,这些都是问题,所以基于这个场景我写了个基于mysql的锁来支持这个功能。
Mysql实现简单的分布式锁
首先是一个大的抽象类,实现lock接口
package com.littlehow.lock;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public abstract class DefaultLock implements Lock {
private final String id;
protected final String key;
// 主要是此处存放锁定key和id使用
protected DefaultLock(String key) {
this.id = UUID.randomUUID().toString().replace("-", "");
this.key = key;
}
// 获取锁id
public String getId(long threadId) {
return id + ":" + threadId;
}
public String getKey() {
return key;
}
@Override
public boolean tryLock() {
try {
return tryLock(-1, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public Condition newCondition() {
return new Condition() {
@Override
public void await() throws InterruptedException {
}
@Override
public void awaitUninterruptibly() {
}
@Override
public long awaitNanos(long nanosTimeout) throws InterruptedException {
return 0;
}
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public boolean awaitUntil(Date deadline) throws InterruptedException {
return false;
}
@Override
public void signal() {
}
@Override
public void signalAll() {
}
};
}
}
真正实现逻辑的分布式锁实现类
package com.littlehow.lock;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class DistributedLock extends DefaultLock {
private final static Map<String, ScheduledFuture> scheduleFuture = new ConcurrentHashMap<>();
private final static AtomicInteger threadId = new AtomicInteger(1);
// 使用默认的拒绝策略AbortPolicy 新任务来了抛出拒绝异常即可
private final ExecutorService pool = new ThreadPoolExecutor(8, 20, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100000),
r -> new Thread(r, "littlehow-lock-" + threadId.getAndIncrement()));
private final ScheduledExecutorService schedule = Executors.newSingleThreadScheduledExecutor((r) -> new Thread(r,"DistributedLock-thread"));
private static final long defaultTimeout = 60000L;
private final long expired;
private ContinueLife continueLife;
private final LockService lockService;
public DistributedLock(long expired, LockService lockService, String key) {
this(expired, null, lockService, key);
}
public DistributedLock(long expired, ContinueLife continueLife, LockService lockService, String key) {
super(key);
this.expired = expired;
this.continueLife = continueLife;
this.lockService = lockService;
}
@Override
public void lockInterruptibly() {
tryLock(-1, TimeUnit.MILLISECONDS);
}
/**
* 如果要实现重入,可以在这里获取锁成功后计数到ThreadLocal,不用考虑计数失败,因为在这里操作计数失败只能是发生了不可控的异常
* 想要保证原子性的话,计数就可以放到底层,如mysql表这些来设置,此处因为没有重入的需求,所以就没有实现加锁去锁的计数
*/
@Override
public boolean tryLock(long time, TimeUnit unit) {
final String id = getId(Thread.currentThread().getId());
// 这里实际上使用的ip获取工具获取的,此处就写死
String ip = "192.168.1.1";
log.debug("get lock key={}, id={}, ip={}", key, id, ip);
Future<Boolean> future = pool.submit(() -> this.lockService.tryLock(this.key, id, System.currentTimeMillis() + expired, ip));
try {
boolean lock = future.get(time == -1L ? defaultTimeout : time, unit);
if (lock && continueLife != null) {
final String cacheKey = key + "-" + id;
if (!scheduleFuture.containsKey(cacheKey)) {
ScheduledFuture taskFuture = schedule.scheduleWithFixedDelay(() -> {
boolean flag = this.continueLife.flushLife(key, id, System.currentTimeMillis() + expired) ;
//如果续命返回false,则会清除续命任务
if (!flag) {
cancelContinueTask(cacheKey);
}
},
expired / 3, expired / 3, TimeUnit.MILLISECONDS);
scheduleFuture.put(cacheKey, taskFuture);
}
}
return lock;
} catch (Exception e) {
log.debug("get lock fail key={} id={} message={}", key, getId(Thread.currentThread().getId()), e.getMessage());
}
return false;
}
@Override
public void unlock() {
String id = getId(Thread.currentThread().getId());
try {
log.info("unlock key={}, id={}", key, id);
this.lockService.unlock(this.key, id);
} catch (Throwable t) {
log.error("解锁异常", t);
cancelContinueTask(key + "-" + id);
}
}
private void cancelContinueTask(String cacheKey) {
//停止相应的续命任务
ScheduledFuture tf = scheduleFuture.get(cacheKey);
if (tf == null) return;
log.info("continue life fail key={}", key);
tf.cancel(true);
log.info("clear task key={}, result={}", key, tf.isCancelled());
scheduleFuture.remove(cacheKey);
}
}
下面是锁接口和续命接口
package com.littlehow.lock;
public interface LockService {
// 获取锁
boolean tryLock(String key, String id, long expired, String ip);
// 解锁
void unlock(String key, String id);
}
=====================================================================================
package com.littlehow.lock;
public interface ContinueLife {
// 刷新过期时间
boolean flushLife(String key, String id, long time);
}
然后是mysql实现的一套锁,基于上面的基础接口和类
package com.littlehow.lock.support.mysql;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
/**
* @author littlehow
* @since 5/28/24 19:43
*/
@Setter
@Getter
@Accessors(chain = true)
public class LockModel {
/**
* 锁的关键key
*/
private String key;
/**
* 锁的机器ip地址
*/
private String ip;
/**
* 锁的实际id
*/
private String lockId;
/**
* 锁的过期时间
*/
private Long expireTime;
}
=====================================================================================
package com.littlehow.lock.support.mysql;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
@Component
@Slf4j
public class MysqlLockSupport {
@Value("${lock.warn.time:30000}")
private long warnTime;
public boolean tryLock(String key, String id, long expired, String ip) {
Assert.hasText(id, "lock id must be not null");
LockModel lockModel = new LockModel().setLockId(id).setExpireTime(expired)
.setKey(key).setIp(ip);
// mysql实际实现细节就不具体写出来了,下面就写个伪代码
// 实际代码是去数据库拉取信息,然后根据数据库信息进行下面的判定
LockModel dbLock = lockModel;
if (dbLock == null) {
// 进行保存,保存成功才返回true,否则返回false,对唯一约束异常也要做保存失败处理
return true;
} else if (id.equals(dbLock.getLockId())) {
// 同一个线程获取两次锁,直接返回true
// 重入逻辑可以在上层使用ThreadLocal实现,这里就不实现数据库的计数了
return true;
} else {
// 这里就是其他线程在对此进行抢锁操作
// 如果时间超过了配置的警告时间,则进行错误日志答应,报警处理
if (System.currentTimeMillis() - warnTime > dbLock.getExpireTime()) {
log.error("key {} deadlock for {}, ip address {}", key, dbLock.getLockId(), dbLock.getIp());
}
}
return false;
}
public void unlock(String key, String id) {
// 如果支持重入的锁,那么上层逻辑一定要减去对应的值,最终等于1才调用此处的逻辑
// 此处的代码就相当于是更新三个值,一个锁的过期时间,一个是锁的lockId。一个是ip地址,都进行置空处理
// 因为这个是为撮合引擎定制的锁,所以这个key才不进行删除,因为此处的key就相当于是交易对,这些交易对基本都是固定的,只会增加,基本不会出现减少的情况
}
public boolean updateLockExpired(String key, String id, long time) {
log.info("start continue life key={}, id={}, time={}", key, id, time);
// 这里是更新锁的续命时间, 如果更新续命时间成功,则返回true即可
return true;
}
}
=====================================================================================
package com.littlehow.lock.support.mysql;
import com.littlehow.lock.LockService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MysqlLockService implements LockService {
@Autowired
private MysqlLockSupport lockSupport;
@Override
public boolean tryLock(String key, String id, long expired, String ip) {
try {
return lockSupport.tryLock(key, id, expired, ip);
} catch (Throwable t) {
log.error("获取锁异常", t);
return false;
}
}
@Override
public void unlock(String key, String id) {
lockSupport.unlock(key, id);
}
}
=====================================================================================
package com.littlehow.lock.support.mysql;
import com.littlehow.lock.ContinueLife;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MysqlContinueLife implements ContinueLife {
@Autowired
private MysqlLockSupport lockSupport;
@Override
public boolean flushLife(String key, String id, long time) {
try {
return lockSupport.updateLockExpired(key, id, time);
} catch (Throwable t) {//出现异常返回true,下次续命任务会继续进行
log.error("锁续命异常", t);
}
return true;
}
}
=====================================================================================
package com.littlehow.lock.support.mysql;
import com.littlehow.lock.DistributedLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
@Component
public class MysqlLockFactory {
@Value("${lock.expired:30000}")
private long expired;
@Autowired
private MysqlContinueLife continueLife;
@Autowired
private MysqlLockService lockService;
private static final Map<String, Lock> locks = new HashMap<>();
/**
* 获取锁信息
* @param key
* @return
*/
public Lock getLock(String key) {
Lock lock = locks.get(key);
if (lock == null) {
synchronized (this) {
lock = locks.get(key);
if (lock == null) {
lock = new DistributedLock(expired, continueLife, lockService, key);
locks.put(key, lock);
}
}
}
return lock;
}
}
然后就是调用了
package com.littlehow.lock;
import com.littlehow.lock.support.mysql.MysqlLockFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.locks.Lock;
/**
* @author littlehow
* @since 5/28/24 20:04
*/
@Slf4j
public class TestLock {
@Autowired
private MysqlLockFactory mysqlLockFactory;
/**
* 这里可以使用junit进行测试调用
*/
public void test() {
Lock lock = mysqlLockFactory.getLock("USD/CNY");
try {
if (lock.tryLock()) {
// 已经获取到锁,可以进行业务处理
} else {
log.info("获取锁失败");
}
} finally {
lock.unlock();
}
}
}
所以整个锁的获取流程图如下
后记
这几天很忙很忙,差点就中断制定的日更博客了,做M功能时的苦难感情戏本来就要登场的,结果一直酝酿不出当时的情绪,感觉写不好,所以就先更新一些我在M项目里面做的一些事情,也算是解析了一点点分布式锁在超长事务里面使用的一些注意事项吧!
今天又看到别人在翻新自己的“沙滩排球”场地,有时候真的羡慕他们呀,没有那么卷的生活,每天都开开心心,还能忙里偷闲做自己喜欢做的事情!
加油吧littlehow
北京时间:2024-05-28 21:10
金边时间:2024-05-28 20:10