分布式事务AP控制方案(上)

news2024/12/25 12:27:52

分布式事务控制方案

本篇文章给出一种要求高可用性(AP思想)的分布式事务控制方案

下篇新鲜出炉:点我查看

  • 分布式事务控制方案
  • 1、业务背景
  • 2、本地消息表的设计
  • 3、对消息表的操作
  • 4、任务调度
  • 5、任务流程控制的抽象类
  • 6、课程发布的实现类
  • 7、总结

1、业务背景

业务背景:在线学习平台,教学机构在上传课程时,需要将课程内容同步到数据库,缓存,文件系统,搜索系统,这里需要用到分布式事务,来确保四个组件的业务顺利完成。

CAP理论中,分布式系统只能满足一致性C、可用性A、分区容错性P三者中的两个,由于分布式系统天然要求分区容错,否则就是单体项目,所以只能选CP或AP

其中CP可以使用Seata框架基于AT和TCC模式去实现,AP也有多种实现方式。

我们的业务背景中这四个组件并不要求强一致性,而是要求高可用性,如果其中某个组件没有完成数据同步,那之前已经完成的组件不必回退到事务开始前的状态,所以我们实现AP思想,采用本地消息表+任务调度完成最终一致性

具体的项目环境:SpringBoot框架,数据库MySQL,使用MyBatis-Plus快速开发,缓存Redis,分布式文件系统MinIO,搜索系统ES

2、本地消息表的设计

在业务背景中,如果用户要进行课程发布,我们向MySQL中的消息表里插入一条记录,记录中应当包含四个组件(MySQL,Redis,MinIO,ES)完成的状态,如果四个组件全部完成,就删除这条记录,向历史消息表中插入一条记录,如果四个组件有哪个没有完成,通过查询记录就可以从未完成的地方重新进行数据同步,从而实现最终一致性。

我们除了课程这个业务场景,还会在其他业务场景执行相似的业务,所以我们要考虑如何进行代码的抽象、封装和复用。

我们发现在消息表中没有必要注明每个具体的组件,而是通过小任务一,小任务二…的方式设计数据表,具体的业务逻辑由具体的业务代码实现,而具体的业务代码通过继承抽象的类,来实现对数据库的控制。

设计一个抽象的类,这个类应当实现对数据表的处理,并提供一个接口,让具体的业务代码实现这个接口。

3、对消息表的操作

首先创建数据库和数据表,表中字段包括业务相关字段(消息类型代码,关联业务信息、代码等等),小任务的状态、上一次成功失败时间、重试次数(暂定五个小任务,提高适用性)。

其次创建一个微服务模块,添加MyBatis-Plus依赖和配置

<!-- MySQL 驱动 -->
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<scope>runtime</scope>
</dependency>
<!-- mybatis plus的依赖 -->
<dependency>
	<groupId>com.baomidou</groupId>
	<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>

spring:
  application:
    name: service
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/table?serverTimezone=UTC&userUnicode=true&useSSL=false&
    username: xxxx
    password: xxxx

实现DAO和service层开发
在这里插入图片描述

在实现流程控制的抽象类之前我们要思考一个问题,具体的业务代码在实现了任务后,如何开启调用任务的执行?

首先肯定不是在发布课程的方法中直接调用方法,这是同步调用,并且只能执行一次,不适合AP思想的分布式事务。对于数据同步实时性要求不高的技术解决方案有很多,例如MQ、Canal、Logstash、任务调度等等

我们可以在插入数据表后,向消息队列添加一条消息,消费者收到消息后检查数据库是否存在对应记录,没有就执行一次任务,如果任务执行失败,就像消息队列添加一条消息。

我们也可以使用中间件canal,解决耦合性问题,canal通过模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 请求,得到 binglog 日志,

我们也可以通过任务调度的方式执行任务,在任务流程代码中查询数据库,根据数据库中的记录执行任务。

这里我们采用任务调度的方式执行任务。

4、任务调度

任务调度是指对计算任务进行合理安排和调度的过程。分布式任务调度是指在分布式系统中,将任务分割成若干份,根据调度规则交由不同的实例并行执行。

XXL-JOB,是一个轻量级分布式任务调度平台,开发迅速,学习简单,易扩展。包含调度中心(管理执行器、任务、日志,监控运维),执行器(注册服务,执行服务,执行结果上报,记录日志)和任务(具体的业务代码)。

如何确保任务不重复执行?任务调度采用分片广播的方式,查询数据表得到任务的id(自增id),模上分片总数,如果等于当前执行器的任务号,就执行该任务,否则不执行,对于任务分配超出执行器执行能力的情况,通过合理设置任务广播频率,以及设置任务拒绝策略为丢弃任务来确保没有任务被重复执行。

执行流程:

启动XXL-JOB调度中心,创建执行器和任务,任务执行实现通过cron表达式设置为每小时一次,设置分片广播和丢弃策略。

在课程发布微服务中添加XXL-JOB的依赖、配置文件和配置类,创建任务方法,在该方法前添加注解@XxlJob(“JobHandler”)

@XxlJob("JobHandler")
public void coursePublishJobHandler() throws Exception {
	// 分片参数
	int shardIndex = XxlJobHelper.getShardIndex();
	int shardTotal = XxlJobHelper.getShardTotal();
	// 在下一节中实现process,这里仅是测试方法
	System.out.println("XXL-JOB任务调度测试成功");
	// process(......);
}

启动微服务,可以在物理地址中看到一个实例,执行一次任务,在输出窗口看到
在这里插入图片描述

我们可以发现,任务的触发是根据创建任务时设置的执行时间来完成的,但是从用户的角度出发,有些用户允许在一段时间以内完成数据同步,例如一到两个工作日内完成,但是有的用户希望在发布课程后能及时的完成数据同步,例如一小时内完成数据同步,这就需要在代码端对xxl-job的控制中心进行通知,xxl-job也提供了这个接口

打开从github下载的xxl-job项目,在JobInfoController的接口中有体现,通过调用start方法,传入任务的id,来触发一次任务。

下面是代码逻辑


	@Override
	public ReturnT<String> start(int id) {
		XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);

		// valid
		ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(xxlJobInfo.getScheduleType(), ScheduleTypeEnum.NONE);
		if (ScheduleTypeEnum.NONE == scheduleTypeEnum) {
			return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type_none_limit_start")) );
		}

		// next trigger time (5s后生效,避开预读周期)
		long nextTriggerTime = 0;
		try {
			Date nextValidTime = JobScheduleHelper.generateNextValidTime(xxlJobInfo, new Date(System.currentTimeMillis() + JobScheduleHelper.PRE_READ_MS));
			if (nextValidTime == null) {
				return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) );
			}
			nextTriggerTime = nextValidTime.getTime();
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
			return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) );
		}

		xxlJobInfo.setTriggerStatus(1);
		xxlJobInfo.setTriggerLastTime(0);
		xxlJobInfo.setTriggerNextTime(nextTriggerTime);

		xxlJobInfo.setUpdateTime(new Date());
		xxlJobInfoDao.update(xxlJobInfo);
		return ReturnT.SUCCESS;
	}

上述代码逻辑:
首先根据传入的任务id获取任务,判断任务的触发方式,如果为空,就报错,返回异常码500。
然后设置任务下一次的触发时间,为当前时间的5s后,更新触发状态,返回success。

我们可以在课程发布的逻辑中中调用这个方法,来实现及时同步课程内容。

注意,需要在调用方法头添加注解@PermissionLimit(limit = false),来绕开登录验证,但是这增加了代码的不安全性,需要对这种权限的使用进行限制。


	@RequestMapping("/startJob")
	@ResponseBody
	@PermissionLimit(limit = false)
	public ReturnT<String> startJob(@RequestBody XxlJobInfo jobInfo) {
		return xxlJobService.start(jobInfo.getId());
	}

5、任务流程控制的抽象类

接下来实现抽象类,在这个类中需要提供任务执行的流程,而非具体的代码,提供一个抽象方法,业务代码通过实现这个抽象方法,在这个方法中实现具体的业务执行代码

此时我们已经得到分片总数和分片号,通过查询数据库中记录,自增id模上分片总数等于分片号的方式判断是否由当前执行实例实行

MyBatis-Plus没有提供查询方法,在Mapper中进行实现

@Select("SELECT t.* FROM mq_message t WHERE t.id % #{shardTotal} = #{shardindex} and t.state='0' and t.message_type=#{messageType} limit #{count}")
List<MqMessage> selectListByShardIndex(@Param("shardTotal") int shardTotal, @Param("shardindex") int shardindex, @Param("messageType") String messageType,@Param("count") int count);

在得到消息记录后,这是一个列表的形式,我们开启线程池,使用newFixedThreadPool,线程总数就是任务数,没有临时线程,使用CountDownLatch控制线程完成情况,每个线程中执行process方法,process是一个抽象方法,由具体的实现类进行实现,返回一个boolean变量,表示任务是否完成,并记录日志。


public abstract class MessageProcessAbstract {

    @Autowired
    MqMessageService mqMessageService;

    /**
     * @param mqMessage 执行任务内容
     * @return boolean true:处理成功,false处理失败
     * @description 任务处理
     * @author zkp15
     * @date 2023/9/21 19:47
     */
    public abstract boolean execute(MqMessage mqMessage);

    /**
     * @description 扫描消息表多线程执行任务
     * @param shardIndex 分片序号
     * @param shardTotal 分片总数
     * @param messageType  消息类型
     * @param count  一次取出任务总数
     * @param timeout 预估任务执行时间,到此时间如果任务还没有结束则强制结束 单位秒
     * @return void
     * @author zkp15
     * @date 2023/9/21 20:35
    */
    public void process(int shardIndex, int shardTotal,  String messageType,int count,long timeout) {

        try {
            //扫描消息表获取任务清单
            List<MqMessage> messageList = mqMessageService.getMessageList(shardIndex, shardTotal,messageType, count);
            //任务个数
            int size = messageList.size();
            if(size<=0){
                return ;
            }

            //创建线程池
            ExecutorService threadPool = Executors.newFixedThreadPool(size);
            //计数器
            CountDownLatch countDownLatch = new CountDownLatch(size);
            messageList.forEach(message -> {
                threadPool.execute(() -> {
                    //处理任务
                    try {
                        boolean result = execute(message);
                        if(result){
                            //更新任务状态,删除消息表记录,添加到历史表
                            int completed = mqMessageService.completed(message.getId());
                            if (completed>0){
                                log.debug("任务执行成功:{}",message);
                            }else{
                                log.debug("任务执行失败:{}",message);
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        log.debug("任务出现异常:{},任务:{}",e.getMessage(),message);
                    }
                    //计数
                    countDownLatch.countDown();

                });
            });

            //等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务
            countDownLatch.await(timeout,TimeUnit.SECONDS);
        } catch (InterruptedException e) {
           e.printStackTrace();
        }
    }
}


6、课程发布的实现类

在这个实现类中,继承上一节的抽象类,以及抽象方法execute,在execute中,分别执行数据库操作,建立缓存,上传分布式文件系统,建立搜索索引。


@Component
public class CoursePublishTask extends MessageProcessAbstract {

	......

    //任务调度入口
    @XxlJob("CoursePublishJobHandler")
    public void coursePublishJobHandler() throws Exception {
        // 分片参数
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        log.debug("shardIndex="+shardIndex+",shardTotal="+shardTotal);
        //参数:分片序号、分片总数、消息类型、一次最多取到的任务数量、一次任务调度执行的超时时间
        process(shardIndex,shardTotal,"course_publish",30,60);
    }

    //课程发布任务处理
    @Override
    public boolean execute(MqMessage mqMessage) {
        //获取消息相关的业务信息
        String businessKey1 = mqMessage.getBusinessKey1();
        long courseId = Integer.parseInt(businessKey1);
        // 课程发布表
        saveCourseToMQ(mqMessage, courseId);
        // 课程缓存
        saveCourseCache(mqMessage, courseId);
        // 课程静态化
        generateCourseHtml(mqMessage, courseId);
        // 课程索引
        saveCourseIndex(mqMessage, courseId);
        return true;
    }

	......

}

7、总结

本文在实际开发业务场景的基础上,给出了一种遵循AP思想的分布式事务控制方案,通过本地消息表+任务调度的方式实现。

项目亮点有:

  • 本地消息表通过任务123代替具体的任务,结合流程控制抽象类,只给出流程控制的代码,具体的业务实现由具体的实现类完成,从而实现解耦合,提高代码复用。

  • 任务流程控制中开启多实例和多线程,并行高效的执行任务。

  • 使用任务调度XXL-JOB进行任务执行,采用分片广播的方式,保证了任务执行的幂等性。其中控制中心提供了两种任务调度的规则,按照Cron的定时执行策略,和非登录任务执行通知的及时执行策略,为用户提供了多样化的体验

由于篇幅原因,四个小任务的实现,数据库、缓存、文件系统、搜索系统的数据同步,我们放在下一篇继续论述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1810015.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

我的创作纪念日--我和CSDN一起走过的1825天

机缘 第一次在CSDN写文章&#xff0c;是自己在记录学习Java8新特性中Lambda表达式的内容过程中收获的学习心得。之前也有记录工作和生活中的心得体会、难点的解决办法、bug的排查处理过程等等。一直都用的有道笔记&#xff0c;没有去和大家区分享的想法&#xff0c;是一起的朋…

《Brave New Words 》2.4 与历史对话

Part II: Giving Voice to the Social Sciences 第二部分&#xff1a;为社会科学发声 Conversing with History 与历史对话 Good history and civics teachers make the past interesting. Great history and civics teachers make the past come alive. When history and civi…

作业07 递推算法2

作业&#xff1a; #include <iostream> using namespace std; int main(){int a[110][110]{0},b[110][110]{0},n;cin>>n;for(int i1;i<n;i){for(int j1;j<i;j){cin>>a[i][j];}}for(int in-1;i>1;i--){for(int j1;j<i;j){a[i][j]a[i][j]max(a[i1]…

离散数学--连通性和矩阵

目录 0.关系的运算和性质 1.通路和回路 2.连通关系 3.割点&#xff08;边&#xff09;和点&#xff08;边&#xff09;割集 4.强&#xff08;弱&#xff09;连通&单向连通 0.关系的运算和性质 &#xff08;1&#xff09;这个运算包括了矩阵的运算&#xff0c;包括这个…

汽车数据应用构想(三)

上期说的&#xff0c;用数据去拟合停车信息的应用&#xff0c;那么类似的POI信息相关的场景其实都可以实现。今天讲讲用户使用频率也很高的加油/充电场景。 实际应用中&#xff0c;在加油场景中用户关心的通常还是价格。无论是导航还是各种加油APP/小程序&#xff0c;都已经很…

了解常用智能指针

智能指针 1、概念 C中引入智能指针的主要目的是为了解决内存管理的问题&#xff0c;传统的指针&#xff08;裸指针&#xff09;在使用时需要手动分配和释放内存&#xff0c;容易出现内存泄漏和悬挂指针等问题。智能指针通过封装裸指针&#xff0c;并提供自动内存管理功能&…

端午安康,最真挚的祝福送最“粽”要的人

端午节&#xff0c;又称端阳节、龙舟节、重五节、天中节等&#xff0c;是集拜神祭祖、祈福辟邪、欢庆娱乐及饮食为一体的民俗大节&#xff0c;与春节、清明节、中秋节并称为中国四大传统节日&#xff0c;2008年被列为国家法定节假日&#xff0c;2009年9月端午节成为中国首个入选…

笔记 | 软件工程04:软件项目管理

1 软件项目及其特点 1.1 什么是项目 1.2 项目特点 1.3 影响项目成功的因素 1.4 什么是软件项目 针对软件这一特定产品和服务的项目努力开展“软件开发活动",&#xff08;理解&#xff1a;软件项目是一种活动&#xff09; 1.5 软件项目的特点 1.6 军用软件项目的特点 2 …

有点好玩的python运维脚本

python运维脚本 1. 常用端口扫描2. 文件整理 1. 常用端口扫描 在计算机网络中&#xff0c;端口是一个通信端点&#xff0c;允许不同的进程或服务通过网络连接和交换数据。端口通过数值来标识&#xff0c;并与特定的协议相关联。未采取适当安全措施而保持端口开放&#xff0c;可…

ICRA 2024:北京工业大学马楠教授联合中科原动力公司推出番茄采摘自主机器人AHPPEBot,实现32.46秒快速准确采摘

当前&#xff0c;农业生产正深受劳动力短缺困扰&#xff0c;这一现状对生产规模的进一步拓展构成了严重制约。为了突破这一瓶颈&#xff0c;实施自动化已成为提升农业生产力的关键途径&#xff0c;这也使得机器人采收技术备受关注。 现今的机器人采收系统普遍采用先进感知方法&…

linux 网桥学习

前言&#xff1a; 本文来学习一下linux网桥概念和网桥配置 1. linux网桥概念 网桥&#xff0c;类似于中继器&#xff0c;连接局域网中两个或者多个网段。它与中继器的不同之处就在于它能够解析它收发的数据&#xff0c;读取目标地址信息&#xff08;MAC&#xff09;&#xff…

技术玩家实现在不支持的CPU上运行 Windows 10 22H2

最近&#xff0c;AMD 悄然确认&#xff0c;其新款 Ryzen AI 300 系列 APU 将不再为 Windows 10 制作芯片组驱动程序&#xff0c;因为它将终止对Windows 10操作系统的支持&#xff0c;尽管它完全有能力这样做。这意味着想要获得官方驱动程序支持的用户必须在其上运行 Windows 11…

darts 时序预测入门

darts是一个强大而易用的Python时间序列建模工具包。在github上目前拥有超过7k颗stars。 它主要支持以下任务: 时间序列预测 (包含 ARIMA, LightGBM模型, TCN, N-BEATS, TFT, DLinear, TiDE等等) 时序异常检测 (包括 分位数检测 等等) 时间序列滤波 (包括 卡尔曼滤波&#xff0…

【Rd-03E】使用CH340给Rd03_E雷达模块烧录固件

Rd03_E 指导手册 安信可新品雷达模组Rd-03搭配STM32制作简易人体感应雷达灯教程 http://t.csdnimg.cn/mqhkE 测距指导手册网址&#xff1a; https://docs.ai-thinker.com/_media/rd-03e%E7%B2%BE%E5%87%86%E6%B5%8B%E8%B7%9D%E7%94%A8%E6%88%B7%E6%89%8B%E5%86%8C%E4%B8%AD%…

【Android面试八股文】一图展示 Android生命周期:从Activity到Fragment,以及完整的Android Fragment生命周期

图片来源于&#xff1a;https://github.com/xxv/android-lifecycle Android生命周期&#xff1a;从Activity到Fragment 图&#xff1a;android-lifecycle-activity-to-fragments.png 完整的Android Fragment生命周期 图&#xff1a;complete_android_fragment_lifecycle.png…

cve_2022_0543-redis沙盒漏洞复现 vulfocus

1. 原理 该漏洞的存在是因为Debian/Ubuntu中的Lua库是作为动态库提供的。自动填充了一个package变量&#xff0c;该变量又允许访问任意 Lua 功能。 2.复现 我们可以尝试payload&#xff1a; eval local io_l package.loadlib("/usr/lib/x86_64-linux-gnu/liblua5.1.so…

AWT常用组件

AWT中常用组件 前言一、基本组件组件名标签(Label类)Label类的构造方法注意要点 按钮(Button)Button的构造方法注意要点 文本框(TextField)TextField类的构造方法注意要点 文本域&#xff08;TextArea&#xff09;TextArea 的构造方法参数scrollbars的静态常量值 复选框&#x…

文心一言 VS 讯飞星火 VS chatgpt (278)-- 算法导论20.3 5题

五、假设我们创建一个包含 u 1 k u^\frac{1}{k} uk1​ 个簇(而不是全域大小为 x ↓ {\sqrt[↓]{x}} ↓x ​ 的 x ↑ {\sqrt[↑]{x}} ↑x ​ 个簇)的 vEB 树&#xff0c;其每个簇的全域大小为 u 1 − 1 k u ^ {1-\frac{1}{k}} u1−k1​ &#xff0c;其中 k>1 &#xff0c…

【UML用户指南】-13-对高级结构建模-包

目录 1、名称 2、元素 3、可见性 4、引入与引出 用包把建模元素安排成可作为一个组来处理的较大组块。可以控制这些元素的可见性&#xff0c;使一些元素在包外是可见的&#xff0c;而另一些元素要隐藏在包内。也可以用包表示系统体系结构的不同视图。 狗窝并不复杂&#x…

数据库管理-第200期 身边的数据库从业者(20240610)

数据库管理200期 2024-06-10 数据库管理-第200期 身边的数据库从业者&#xff08;20240610&#xff09;首席-薛晓刚院长-施嘉伟邦德-王丁丁强哥-徐小强会长-吴洋灿神-熊灿灿所长-严少安探长-张震总结活动预告 数据库管理-第200期 身边的数据库从业者&#xff08;20240610&#…