M功能-分布式锁-支付平台(五)

news2024/11/17 7:27:12

target:离开柬埔寨倒计时-218day

在这里插入图片描述

珍藏的图片又拿出来了

前言

M系统中的撮合引擎是最最核心的功能,第一版的撮合引擎不是我写的,也没有做交易对的动态分配这样的功能,都是基于抢锁方式来决定谁拥有该交易对的撮合权限,所以锁就至关重要了,本来最简单的方法就是只起一个java进程,然后用jdk的锁就不用担心这些问题了,但是当交易对多的时候,一个进程就不一定能及时的处理这些订单,所以还是需要多台机器同步进行处理,所以还是需要分布式锁。

我接触到的最初版本

我初次接触这个系统是在2019年初

记得是在那年4月还是5月的时候,发生了一个异常,同一个订单撮合了两次,本来那个订单在第一次撮合后就已经全部成交了,所以紧跟着就来了第二笔撮合,那时的负责人让我协助排查这个问题,我就一脸懵的开始了排查之路

  • 首先我快速熟悉这套交易流程,让负责人给我讲解;
  • 根据交易流程,发现问题出现的原因一定在撮合引擎上面;
  • 查看撮合引擎的日志

当时撮合引擎的线程名称是撮合引擎前缀+交易对+编号,排查日志很容易发现其中有两个线程名称和相似,只有编号不一样,交易对是一样的,这就意味着同一个交易对有两个线程在进行撮合,因为这两个线程处于不同的jvm进程内,所以就没办法共享订单簿内存,这样就会出现撮合多次的情况了。

看到这里我不禁心想,这不是锁住了吗,怎么会还出现同一个交易对被两个线程都撮合的情况呢,除非这个锁没有锁住,我先是去查看了加锁的逻辑,加锁使用的是redisson,加锁的key是交易对,所以从逻辑上看是没什么问题的;然后我就继续排查日志,我看到第二台服务器的那个线程产生撮合日志的时间就在几个小时前,属于我就着重去找了那段时间的日志;

从里面的日志我看到了一条很有嫌疑的日志,不能更改锁的过期时间,这时候我隐约知道问题出现的原因了

先来看一段redisson锁里面的一段关键代码片段

类:org.redisson.RedissonLock

// 这个其实就是给redisson锁保活的一个续命任务
private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    // 就是这里,如果这里发生了异常,就不会执行下面的对自己的调用
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    // 其实当时key是存在的,只是发生了网络问题,所以没有到这个分支
                    if (res) {
                        // reschedule itself
                        // 每次续命成功才会继续发起下一次的续命
                        renewExpiration();
                    }
                });
            }
           // 这里续命时间默认是锁超时时间的1/3,也就是说默认30s的话,会每10s发起一次续命
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

其实这个续命任务在多数场景下都是足以支持的了,像我遇到的这个场景是比较少见的,当然也可以增大锁的超时时间,但是多长的时间能满足呢,这些都是问题,所以基于这个场景我写了个基于mysql的锁来支持这个功能。

Mysql实现简单的分布式锁

首先是一个大的抽象类,实现lock接口

package com.littlehow.lock;

import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public abstract class DefaultLock implements Lock {

    private final String id;
    protected final String key;
	// 主要是此处存放锁定key和id使用
    protected DefaultLock(String key) {
        this.id = UUID.randomUUID().toString().replace("-", "");
        this.key = key;
    }

    // 获取锁id
    public String getId(long threadId) {
        return id + ":" + threadId;
    }

    public String getKey() {
        return key;
    }

    @Override
    public boolean tryLock() {
        try {
            return tryLock(-1, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

    @Override
    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public Condition newCondition() {
        return new Condition() {
            @Override
            public void await() throws InterruptedException {

            }

            @Override
            public void awaitUninterruptibly() {

            }

            @Override
            public long awaitNanos(long nanosTimeout) throws InterruptedException {
                return 0;
            }

            @Override
            public boolean await(long time, TimeUnit unit) throws InterruptedException {
                return false;
            }

            @Override
            public boolean awaitUntil(Date deadline) throws InterruptedException {
                return false;
            }

            @Override
            public void signal() {

            }

            @Override
            public void signalAll() {

            }
        };
    }
}

真正实现逻辑的分布式锁实现类

package com.littlehow.lock;

import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class DistributedLock extends DefaultLock {
    private final static Map<String, ScheduledFuture> scheduleFuture = new ConcurrentHashMap<>();
    private final static AtomicInteger threadId = new AtomicInteger(1);
    // 使用默认的拒绝策略AbortPolicy 新任务来了抛出拒绝异常即可
    private final ExecutorService pool = new ThreadPoolExecutor(8, 20, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100000),
            r -> new Thread(r, "littlehow-lock-" + threadId.getAndIncrement()));

    private final ScheduledExecutorService schedule = Executors.newSingleThreadScheduledExecutor((r) -> new Thread(r,"DistributedLock-thread"));
    private static final long defaultTimeout = 60000L;
    private final long expired;
    private ContinueLife continueLife;
    private final LockService lockService;

    public DistributedLock(long expired, LockService lockService, String key) {
        this(expired, null, lockService, key);
    }

    public DistributedLock(long expired, ContinueLife continueLife, LockService lockService, String key) {
        super(key);
        this.expired = expired;
        this.continueLife = continueLife;
        this.lockService = lockService;
    }

    @Override
    public void lockInterruptibly() {
        tryLock(-1, TimeUnit.MILLISECONDS);
    }

    /**
     * 如果要实现重入,可以在这里获取锁成功后计数到ThreadLocal,不用考虑计数失败,因为在这里操作计数失败只能是发生了不可控的异常
     * 想要保证原子性的话,计数就可以放到底层,如mysql表这些来设置,此处因为没有重入的需求,所以就没有实现加锁去锁的计数
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) {
        final String id = getId(Thread.currentThread().getId());
        // 这里实际上使用的ip获取工具获取的,此处就写死
        String ip = "192.168.1.1";
        log.debug("get lock key={}, id={}, ip={}", key, id, ip);
        Future<Boolean> future = pool.submit(() ->  this.lockService.tryLock(this.key, id, System.currentTimeMillis() + expired, ip));
        try {
            boolean lock = future.get(time == -1L ? defaultTimeout : time, unit);
            if (lock && continueLife != null) {
                final String cacheKey = key + "-" + id;
                if (!scheduleFuture.containsKey(cacheKey)) {
                    ScheduledFuture taskFuture = schedule.scheduleWithFixedDelay(() -> {
                            boolean flag = this.continueLife.flushLife(key, id, System.currentTimeMillis() + expired) ;
                            //如果续命返回false,则会清除续命任务
                            if (!flag) {
                                cancelContinueTask(cacheKey);
                            }
                        },
                        expired / 3, expired / 3, TimeUnit.MILLISECONDS);
                    scheduleFuture.put(cacheKey, taskFuture);
                }
            }
            return lock;
        } catch (Exception e) {
            log.debug("get lock fail key={} id={} message={}", key, getId(Thread.currentThread().getId()), e.getMessage());
        }
        return false;
    }

    @Override
    public void unlock() {
        String id = getId(Thread.currentThread().getId());
        try {
            log.info("unlock key={}, id={}", key, id);
            this.lockService.unlock(this.key, id);
        } catch (Throwable t) {
            log.error("解锁异常", t);
            cancelContinueTask(key + "-" + id);
        }
    }

    private void cancelContinueTask(String cacheKey) {
        //停止相应的续命任务
        ScheduledFuture tf = scheduleFuture.get(cacheKey);
        if (tf == null) return;
        log.info("continue life fail key={}", key);
        tf.cancel(true);
        log.info("clear task key={}, result={}", key, tf.isCancelled());
        scheduleFuture.remove(cacheKey);
    }
}

下面是锁接口和续命接口

package com.littlehow.lock;

public interface LockService {
    // 获取锁
    boolean tryLock(String key, String id, long expired, String ip);
	
    // 解锁
    void unlock(String key, String id);
}

=====================================================================================

package com.littlehow.lock;

public interface ContinueLife {
    // 刷新过期时间
    boolean flushLife(String key, String id, long time);
}

然后是mysql实现的一套锁,基于上面的基础接口和类

package com.littlehow.lock.support.mysql;

import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;

/**
 * @author littlehow
 * @since 5/28/24 19:43
 */
@Setter
@Getter
@Accessors(chain = true)
public class LockModel {
    /**
     * 锁的关键key
     */
    private String key;

    /**
     * 锁的机器ip地址
     */
    private String ip;

    /**
     * 锁的实际id
     */
    private String lockId;

    /**
     * 锁的过期时间
     */
    private Long expireTime;
}

=====================================================================================
package com.littlehow.lock.support.mysql;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;


@Component
@Slf4j
public class MysqlLockSupport {
    @Value("${lock.warn.time:30000}")
    private long warnTime;

    public boolean tryLock(String key, String id, long expired, String ip) {
        Assert.hasText(id, "lock id must be not null");
        LockModel lockModel = new LockModel().setLockId(id).setExpireTime(expired)
                .setKey(key).setIp(ip);
        // mysql实际实现细节就不具体写出来了,下面就写个伪代码
        // 实际代码是去数据库拉取信息,然后根据数据库信息进行下面的判定
        LockModel dbLock = lockModel;
        if (dbLock == null) {
            // 进行保存,保存成功才返回true,否则返回false,对唯一约束异常也要做保存失败处理
            return true;
        } else if (id.equals(dbLock.getLockId())) {
            // 同一个线程获取两次锁,直接返回true
            // 重入逻辑可以在上层使用ThreadLocal实现,这里就不实现数据库的计数了
            return true;
        } else {
            // 这里就是其他线程在对此进行抢锁操作
            // 如果时间超过了配置的警告时间,则进行错误日志答应,报警处理
            if (System.currentTimeMillis() - warnTime > dbLock.getExpireTime()) {
                log.error("key {} deadlock for {}, ip address {}", key, dbLock.getLockId(), dbLock.getIp());
            }
        }
        return false;
    }

    public void unlock(String key, String id) {
        // 如果支持重入的锁,那么上层逻辑一定要减去对应的值,最终等于1才调用此处的逻辑
        // 此处的代码就相当于是更新三个值,一个锁的过期时间,一个是锁的lockId。一个是ip地址,都进行置空处理
        // 因为这个是为撮合引擎定制的锁,所以这个key才不进行删除,因为此处的key就相当于是交易对,这些交易对基本都是固定的,只会增加,基本不会出现减少的情况
    }

    public boolean updateLockExpired(String key, String id, long time) {
        log.info("start continue life key={}, id={}, time={}", key, id, time);
        // 这里是更新锁的续命时间, 如果更新续命时间成功,则返回true即可
        return true;
    }
}

=====================================================================================
package com.littlehow.lock.support.mysql;

import com.littlehow.lock.LockService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MysqlLockService implements LockService {

    @Autowired
    private MysqlLockSupport lockSupport;

    @Override
    public boolean tryLock(String key, String id, long expired, String ip) {
        try {
            return lockSupport.tryLock(key, id, expired, ip);
        } catch (Throwable t) {
            log.error("获取锁异常", t);
            return false;
        }
    }

    @Override
    public void unlock(String key, String id) {
        lockSupport.unlock(key, id);
    }

}

=====================================================================================
package com.littlehow.lock.support.mysql;

import com.littlehow.lock.ContinueLife;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MysqlContinueLife implements ContinueLife {

    @Autowired
    private MysqlLockSupport lockSupport;

    @Override
    public boolean flushLife(String key, String id, long time) {
        try {
            return lockSupport.updateLockExpired(key, id, time);
        } catch (Throwable t) {//出现异常返回true,下次续命任务会继续进行
            log.error("锁续命异常", t);
        }
        return true;
    }
}

=====================================================================================
package com.littlehow.lock.support.mysql;

import com.littlehow.lock.DistributedLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;

@Component
public class MysqlLockFactory {
    @Value("${lock.expired:30000}")
    private long expired;

    @Autowired
    private MysqlContinueLife continueLife;

    @Autowired
    private MysqlLockService lockService;

    private static final Map<String, Lock> locks = new HashMap<>();

    /**
     * 获取锁信息
     * @param key
     * @return
     */
    public Lock getLock(String key) {
        Lock lock = locks.get(key);
        if (lock == null) {
            synchronized (this) {
                lock = locks.get(key);
                if (lock == null) {
                    lock = new DistributedLock(expired, continueLife, lockService, key);
                    locks.put(key, lock);
                }
            }
        }
        return lock;
    }
}

然后就是调用了

package com.littlehow.lock;

import com.littlehow.lock.support.mysql.MysqlLockFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.locks.Lock;


/**
 * @author littlehow
 * @since 5/28/24 20:04
 */
@Slf4j
public class TestLock {

    @Autowired
    private MysqlLockFactory mysqlLockFactory;

    /**
     * 这里可以使用junit进行测试调用
     */
    public void test() {
        Lock lock = mysqlLockFactory.getLock("USD/CNY");
        try {
            if (lock.tryLock()) {
                // 已经获取到锁,可以进行业务处理
            } else {
                log.info("获取锁失败");
            }
        } finally {
            lock.unlock();
        }
    }
}

所以整个锁的获取流程图如下

在这里插入图片描述

后记

这几天很忙很忙,差点就中断制定的日更博客了,做M功能时的苦难感情戏本来就要登场的,结果一直酝酿不出当时的情绪,感觉写不好,所以就先更新一些我在M项目里面做的一些事情,也算是解析了一点点分布式锁在超长事务里面使用的一些注意事项吧!

今天又看到别人在翻新自己的“沙滩排球”场地,有时候真的羡慕他们呀,没有那么卷的生活,每天都开开心心,还能忙里偷闲做自己喜欢做的事情!
在这里插入图片描述

加油吧littlehow
北京时间:2024-05-28 21:10

金边时间:2024-05-28 20:10

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1711082.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Qt秘籍】[003]-Qt环境变量配置-磨刀不误砍柴工

一、为什么要设置环境变量 &#xff1f;[原因] 配置PATH环境变量的主要用处在于让操作系统能够识别并执行不在当前工作目录下的可执行文件。具体来说&#xff0c;它的作用包括&#xff1a; 命令执行便捷性&#xff1a;当你在命令行输入一个命令&#xff08;如java, python或np…

Collection(一)[集合体系]

说明&#xff1a;Collection代表单列集合&#xff0c;每个元素&#xff08;数据&#xff09;只包含一个值。 Collection集合体系&#xff1a; Collection<E> 接口 (一&#xff09;List<E> 接口 说明&#xff1a;添加的元素是有序、可重复、有索引。 1. ArrayLi…

在table中获取每一行scope的值

目的 当前有一份如下数据需要展示在表格中&#xff0c;表格的页面元素套了一个折叠面板&#xff0c;需要循环page_elements中的数据展示出来 错误实践 将template放在了折叠面板中&#xff0c;获取到的scope是空数组 <el-table-column label"页面元素" show-o…

【技术实操】银河高级服务器操作系统实例分享,达梦数据库服务器 oom 问题分析

1. 服务器环境以及配置 【 机型】 处理器&#xff1a; HUAWEIKunpeng 920 5220 内存&#xff1a; 400518528 kB 主板型号&#xff1a; Chaoqiang K620 series 整机类型/架构&#xff1a; ARM BIOS 版本&#xff1a; KL4.41.028.TF.220224.R 固件版本&#xff1a; KL4.41…

nginx源码阅读理解 [持续更新,建议关注]

文章目录 前述一、nginx 进程模型基本流程二、源码里的小点1.对字符串操作都进行了原生实现2.配置文件解析也是原生实现待续 前述 通过对 nginx 的了解和代码简单阅读&#xff0c;发现这个C代码的中间件确实存在过人之处&#xff0c;使用场景特别多&#xff0c;插件模块很丰富…

加密资产私钥安全完整手册(一) ,bitget钱包为例

比特币和以太坊等加密货币的兴起开创了数字金融的新时代&#xff0c;但也带来了独特的安全挑战。这些代表现实世界价值的数字资产已成为黑客和窃贼的主要目标。为了安全地应对这种情况&#xff0c;了解私钥的基本概念至关重要。 私钥是加密货币所有权和安全性的基石。它们相当于…

三维天地参编《数据要素流通标准化白皮书(2024版)》正式发布

近日,在福州举行的第七届数字中国建设峰会数据标准化和数据基础设施分论坛-数据标准化专场举行。国家数据局局长刘烈宏、福建省政府党组成员李兴湖出席并致辞。 该论坛由国家数据局主办、中国电子技术标准化研究院承办,围绕“标准引领 数创未来”主题,分享实践经验,明晰建设路…

故障诊断 | 基于KAN故障诊断模型

效果一览 文章概述 故障诊断 | 基于 KAN故障诊断模型。KAN是一种全新的神经网络架构&#xff0c;它与传统的MLP架构不同&#xff0c;能够用更少的参数量在Science领域取得惊人的表现&#xff0c;并且具备可解释性&#xff0c;有望成为深度学习模型发展的一个重要方向。运用KAN&…

网页上怎么打开iPhone手机上的备忘录 备忘录网页端打开方式

我经常使用iPhone的备忘录功能&#xff0c;随手记录生活中的点点滴滴&#xff0c;工作中的待办事项。然而&#xff0c;有时候&#xff0c;当我坐在电脑前&#xff0c;想要快速查看或编辑备忘录内容时&#xff0c;手机的小屏幕就显得不那么方便了。那么&#xff0c;如何在电脑上…

scrapy 整合 mitm

1.mitm 是什么 MITMproxy 是一个开源的中间人代理&#xff0c;常用于网络流量的拦截、查看和修改。 2.scrapy 整合 mitm步骤 2.1 安装mitm PS F:\studyScrapy\itcastScrapy> pip install mitmproxy2.2 在settings 中配置下载器中间件 # settings.pyDOWNLOADER_MIDDLEWARES…

如何将音频中的人声分离出来?

想要把一段视频中的人声跟背景音乐分离开来&#xff0c;找个好一点的音频处理软件就能把声音分离了&#xff0c;常见的有以下方法&#xff0c;一起来看看吧。 pr 打开软件&#xff0c;然后将电脑上的音频文件&#xff0c;上传到软件中&#xff0c;然后按住[ctrla]选择所有音频…

【硬核测评】猫咪主食冻干测评揭秘SC、希喂、爱立方真实对比测评

主食冻干喂养是否必要&#xff1f; 来自七年经验的铲屎官明确告诉你&#xff0c;这是非常必要的喂养方式&#xff01; 随着宠物经济的蓬勃发展和科学养宠知识的普及&#xff0c;如今养猫已不仅仅是让猫咪吃饱那么简单。越来越多的养猫人开始重视猫咪的饮食健康。大量实际喂养案…

SpringBoot之@AutoConfigureBefore、@AutoConfigureAfter、@AutoConfigureOrder注解

前言 SpringBoot通过AutoConfigureOrder、AutoConfigureBefore、AutoConfigureAfter注解&#xff0c;控制自动配置类的实例化顺序。 Spring中控制Bean的实例化顺序 Spring中默认实例化顺序 创建实体类A、B、C Component public class A {public A() {System.out.println(&…

小熊家务帮day5 客户管理模块1 (小程序认证,手机验证码认证等)

客户管理模块 1.认证模块1.1 认证方式介绍1.1.1 小程序认证1.1.2 手机验证码登录1.1.3 账号密码认证 1.2 小程序认证1.2.1 小程序申请1.2.2 创建客户后端工程jzo2o-customer1.2.3 开发部署前端1.2.4 小程序认证流程1.2.4.1 customer小程序认证接口设计Controller层Service层调用…

The 2022 ICPC Asia Nanjing Regional Contest - External G

题目链接:Problem - D - Codeforces 写在前面&#xff1a;今天的训练赛打的稀碎&#xff0c;一道稍微难一点的签到题就把我难住了&#xff0c;看完题解确实感觉不难&#xff0c;看来题目还是刷太少了。 回归正题 题意&#xff1a; 思路&#xff1a;尽量让分子大&#xff0c;分…

操作系统实验三 可变分区内存分配首次适应算法模拟

实验三 可变分区内存分配首次适应算法模拟 实验内容 模拟内存分配&#xff0c;了解并掌握动态分区分配中所用的数据结构、分区分配算法&#xff0c;深刻理解首次适应内存分配算法。 模拟实现可变分区内存分配首次适应算法&#xff1b;空闲分区表要求有空闲块的起始地址、大小…

LLama学习记录

学习前&#xff1a; 五大问题&#xff1a; 为什么SwiGLU激活函数能够提升模型性能&#xff1f;RoPE位置编码是什么&#xff1f;怎么用的&#xff1f;还有哪些位置编码方式&#xff1f;GQA&#xff08;Grouped-Query Attention, GQA&#xff09;分组查询注意力机制是什么&…

使用Python发送电子邮件

大家好&#xff0c;当我们需要迅速、方便地与他人沟通时&#xff0c;电子邮件是无疑是一种不可或缺的通信工具。无论是在个人生活中还是工作场合&#xff0c;电子邮件都是我们日常生活中的重要组成部分。它不仅能够传递文字信息&#xff0c;还可以发送附件、链接和嵌入式多媒体…

AJAX基础知识

定义 Ajax 异步 JavaScript 和 XML &#xff08; async javascript and xml &#xff09;&#xff0c;使用 Ajax 技术网页应用能够快速地将数据更新呈现在用户界面上&#xff0c;而不需要重载&#xff08;刷新&#xff09;整个页面&#xff0c;这使得程序能够更快地回应用户的操…

6、xss-labs之level8

1、测试分析 传入123查看页面源码&#xff0c;发现传入的值传给了value和a标签的href&#xff0c;并且对特殊字符<>" 都进行了HTML实体化&#xff0c;对于大小写进行了转化&#xff0c;过滤掉了src、data、onfocus、href、script、"&#xff08;双引号&#…