DDD 架构分层,MQ消息要放到那一层处理?

news2024/11/23 13:18:57

作者:小傅哥
博客:https://bugstack.cn

沉淀、分享、成长,让自己和他人都能有所收获!😄

本文的宗旨在于通过简单干净实践的方式教会读者,使用 Docker 配置 RocketMQ 并在基于 DDD 分层结构的 SpringBoot 工程中使用 RocketMQ 技术。因为大部分 MQ 的发送都是基于特定业务场景的,所以本章节也是基于 《MyBatis 使用教程和插件开发》 章节的扩展。

本章也会包括关于 MQ 消息的发送和接收应该处于 DDD 的哪一层的实践讲解和使用。

本文涉及的工程:

  • xfg-dev-tech-rocketmq:https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-rocketmq
  • RocketMQ Docker 安装:rocketmq-docker-compose-mac-amd-arm.yml
  • 导入测试库表 road-map.sql

一、案例背景

首先我们要知道,MQ 消息的作用是用于;解耦过长的业务流程应对流量冲击的消峰。如;用户下单支付完成后,拿到支付消息推动后续的发货流程。也可以是我们基于 《MyBatis 使用教程和插件开发》 中的案例场景,给雇员提升级别和薪资的时候,也发送一条MQ消息,用于发送邮件通知给用户。

  • 从薪资调整到邮件发送,这里是2个业务流程,通过 MQ 消息的方式进行连接。
  • 其实MQ消息的使用场景特别多,原来你可能使用多线程的一些操作,现在就扩展为多实例的操作了。发送 MQ 消息出来,让应用的各个实例接收并进行消费。

二、领域事件

因为我们本章所讲解的内容是把 RocketMQ 放入 DDD 架构中进行使用,那么也就引申出领域事件定义。所以我们先来了解下,什么是领域事件。

领域事件,可以说是解耦微服务设计的关键。领域事件也是领域模型中非常重要的一部分内容,用于标示当前领域模型中发生的事件行为。一个领域事件会推进业务流程的进一步操作,在实现业务解耦的同时,也推动了整个业务的闭环。

  • 首先,我们需要在领域模型层,添加一块 event 区域。它的存在是为了定义出于当前领域下所需的事件消息信息。信息的类型可以是model 下的实体对象、聚合对象。
  • 之后,消息的发送是放在基础设置层。本身基础设置层就是依赖倒置于模型层,所以在模型层所定义的 event 对象,可以很方便的在基础设置层使用。而且大部分开发的时候,MQ消息的发送与数据库操作都是关联的,采用的方式是,做完数据落库后,推送MQ消息。所以定义在仓储中实现,会更加得心应手、水到渠成。
  • 最后,就是 MQ 的消息,MQ 的消费可以是自身服务所发出的消息,也可以是外部其他微服务的消息。就在小傅哥所整体讲述的这套简明教程中 DDD 部分的触发器层。

三、环境安装

本案例涉及了数据库和RocketMQ的使用,都已经在工程中提供了安装脚本,可以按需执行。

这里主要介绍 RocketMQ 的安装;

1. 执行 compose yml

文件:docs/rocketmq/rocketmq-docker-compose-mac-amd-arm.yml - 关于安装小傅哥提供了不同的镜像,包括Mac、Mac M1、Windows 可以按需选择使用。

version: '3'
services:
  # https://hub.docker.com/r/xuchengen/rocketmq
  # 注意修改项;
  # 01:data/rocketmq/conf/broker.conf 添加 brokerIP1=127.0.0.1
  # 02:data/console/config/application.properties server.port=9009 - 如果8080端口被占用,可以修改或者添加映射端口
  rocketmq:
    image: livinphp/rocketmq:5.1.0
    container_name: rocketmq
    ports:
      - 9009:9009
      - 9876:9876
      - 10909:10909
      - 10911:10911
      - 10912:10912
    volumes:
      - ./data:/home/app/data
    environment:
      TZ: "Asia/Shanghai"
      NAMESRV_ADDR: "rocketmq:9876"
  • 在 IDEA 中打开 rocketmq-docker-compose-mac-amd-arm.yml 你会看到一个绿色的按钮在左侧侧边栏,点击即可安装。或者你也可以使用命令安装:# /usr/local/bin/docker-compose -f /docs/dev-ops/environment/environment-docker-compose.yml up -d - 比较适合在云服务器上执行。
  • 首次安装可能使用不了,一个原因是 brokerIP1 未配置IP,另外一个是默认的 8080 端口占用。可以按照如下小傅哥说的方式修改。

2. 修改默认配合

  1. 打开 data/rocketmq/conf/broker.conf 添加一条 brokerIP1=127.0.0.1 在结尾
# 集群名称
brokerClusterName = DefaultCluster
# BROKER 名称
brokerName = broker-a
# 0 表示 Master, > 0 表示 Slave
brokerId = 0
# 删除文件时间点,默认凌晨 4 点
deleteWhen = 04
# 文件保留时间,默认 48 小时
fileReservedTime = 48
# BROKER 角色 ASYNC_MASTER为异步主节点,SYNC_MASTER为同步主节点,SLAVE为从节点
brokerRole = ASYNC_MASTER
# 刷新数据到磁盘的方式,ASYNC_FLUSH 刷新
flushDiskType = ASYNC_FLUSH
# 存储路径
storePathRootDir = /home/app/data/rocketmq/store
# IP地址
brokerIP1 = 127.0.0.1
  1. 打开 ``data/console/config/application.properties修改server.port=9009` 端口。
server.address=0.0.0.0
server.port=9009
  • 修改配置后,重启服务。

3. RockMQ登录与配置

3.1 登录

RocketMQ 此镜像,会在安装后在控制台打印登录账号信息,你可以查看使用。

登录:http://localhost:9009/

3.2 创建Topic

  • 也可以使用命令创建:docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t xfg-mq

3.3 创建消费者组

  • 也可以使用命令创建:docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g xfg-group

四、工程实现

1. 工程结构

  • MQ 的使用无论是 RocketMQ 还是 Kafka 等,都很简单。但在使用之前,要考虑好怎么在架构中合理的使用。如果最初没有定义好这些,那么胡乱的任何地方都能发送和接收MQ,最后的工程将非常难以维护。
  • 所以这里整个MQ的生产和消费,是按照整个 DDD 领域事件结构进行设计。分为在 domain 使用基础层生产消息,再有 trigger 层接收消息。

2. 配置文件

引入POM

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-java -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.4</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

添加配置

# RocketMQ 配置
rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: xfg-group
    # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
    pull-batch-size: 10
  producer:
    # 发送同一类消息的设置为同一个group,保证唯一
    group: xfg-group
    # 发送消息超时时间,默认3000
    sendMessageTimeout: 10000
    # 发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2
    # 异步消息重试此处,默认2
    retryTimesWhenSendAsyncFailed: 2
    # 消息最大长度,默认1024 * 1024 * 4(默认4M)
    maxMessageSize: 4096
    # 压缩消息阈值,默认4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在内部发送失败时重试另一个broker,默认false
    retryNextServer: false

3. 定义领域事件

源码cn.bugstack.xfg.dev.tech.domain.salary.event.SalaryAdjustEvent

@EqualsAndHashCode(callSuper = true)
@Data
public class SalaryAdjustEvent extends BaseEvent<AdjustSalaryApplyOrderAggregate> {

    public static String TOPIC = "xfg-mq";

    public static SalaryAdjustEvent create(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {
        SalaryAdjustEvent event = new SalaryAdjustEvent();
        event.setId(RandomStringUtils.randomNumeric(11));
        event.setTimestamp(new Date());
        event.setData(adjustSalaryApplyOrderAggregate);
        return event;
    }

}
  • 每个领域的消息,都有领域自己定义。发送的时候再交给基础设施层来发送。

4. 消息发送

源码cn.bugstack.xfg.dev.tech.infrastructure.event.EventPublisher

@Component
@Slf4j
public class EventPublisher {

    @Setter(onMethod_ = @Autowired)
    private RocketMQTemplate rocketmqTemplate;

    /**
     * 普通消息
     *
     * @param topic   主题
     * @param message 消息
     */
    public void publish(String topic, BaseEvent<?> message) {
        try {
            String mqMessage = JSON.toJSONString(message);
            log.info("发送MQ消息 topic:{} message:{}", topic, mqMessage);
            rocketmqTemplate.convertAndSend(topic, mqMessage);
        } catch (Exception e) {
            log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(message), e);
            // 大部分MQ发送失败后,会需要任务补偿
        }
    }

    /**
     * 延迟消息
     *
     * @param topic          主题
     * @param message        消息
     * @param delayTimeLevel 延迟时长
     */
    public void publishDelivery(String topic, BaseEvent<?> message, int delayTimeLevel) {
        try {
            String mqMessage = JSON.toJSONString(message);
            log.info("发送MQ延迟消息 topic:{} message:{}", topic, mqMessage);
            rocketmqTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 1000, delayTimeLevel);
        } catch (Exception e) {
            log.error("发送MQ延迟消息失败 topic:{} message:{}", topic, JSON.toJSONString(message), e);
            // 大部分MQ发送失败后,会需要任务补偿
        }
    }

}
  • 在基础设施层提供 event 事件的处理,也就是 MQ 消息的发送。

源码cn.bugstack.xfg.dev.tech.infrastructure.repository.SalaryAdjustRepository

@Resource
private EventPublisher eventPublisher;
    
@Override
@Transactional(rollbackFor = Exception.class, timeout = 350, propagation = Propagation.REQUIRED, isolation = Isolation.DEFAULT)
public String adjustSalary(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {
		 
		// ... 省略部分代码 

    eventPublisher.publish(SalaryAdjustEvent.TOPIC, SalaryAdjustEvent.create(adjustSalaryApplyOrderAggregate));
    return orderId;
}

在 SalaryAdjustRepository 仓储的实现中,做完业务流程开始发送 MQ 消息。这里有2点要注意;

  1. 消息发送,不要写在数据库事务中。因为事务一直占用数据库连接,需要快速释放。
  2. 对于一些强MQ要求的场景,需要在发送MQ前,写入一条数据库 Task 记录,发送消息后更新 Task 状态为成功。如果长时间未更新数据库状态或者为失败的,则需要由任务补偿进行处理。

5. 消费消息

源码cn.bugstack.xfg.dev.tech.trigger.mq.SalaryAdjustMQListener

@Component
@Slf4j
@RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group")
public class SalaryAdjustMQListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        log.info("接收到MQ消息 {}", s);
    }

}
  • 消费消息,配置消费者组合消费的主题,之后就可以接收到消息了。接收以后你可以做自己的业务,如果抛出异常,消息会进行重新接收处理。

六、测试验证

1. 单独发送消息测试

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketMQTest {

    @Setter(onMethod_ = @Autowired)
    private RocketMQTemplate rocketmqTemplate;

    @Test
    public void test() throws InterruptedException {
        while (true) {
            rocketmqTemplate.convertAndSend("xfg-mq", "我是测试消息");
            Thread.sleep(3000);
        }
    }

}
  • 这里方便你来发送消息,验证流程。

2. 业务流程消息验证

@Test
public void test_execSalaryAdjust() throws InterruptedException {
    AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate = AdjustSalaryApplyOrderAggregate.builder()
            .employeeNumber("10000001")
            .orderId("100908977676003")
            .employeeEntity(EmployeeEntity.builder().employeeLevel(EmployeePostVO.T3).employeeTitle(EmployeePostVO.T3).build())
            .employeeSalaryAdjustEntity(EmployeeSalaryAdjustEntity.builder()
                    .adjustTotalAmount(new BigDecimal(100))
                    .adjustBaseAmount(new BigDecimal(80))
                    .adjustMeritAmount(new BigDecimal(20)).build())
            .build();
    String orderId = salaryAdjustApplyService.execSalaryAdjust(adjustSalaryApplyOrderAggregate);
    log.info("调薪测试 req: {} res: {}", JSON.toJSONString(adjustSalaryApplyOrderAggregate), orderId);
    Thread.sleep(Integer.MAX_VALUE);
}
23-07-29.15:40:52.307 [main            ] INFO  HikariDataSource       - HikariPool-1 - Start completed.
23-07-29.15:40:52.445 [main            ] INFO  EventPublisher         - 发送MQ消息 topic:xfg-mq message:{"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"}
23-07-29.15:40:52.517 [main            ] INFO  ISalaryAdjustApplyServiceTest - 调薪测试 req: {"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"} res: 100908977676004
23-07-29.15:40:52.520 [ConsumeMessageThread_1] INFO  SalaryAdjustMQListener - 接收到MQ消息 {"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"}
  • 当执行一次加薪调整后,就会接收到MQ消息了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/888104.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Nginx反向代理技巧

跨域 作为一个前端开发者来说不可避免的问题就是跨域&#xff0c;那什么是跨域呢&#xff1f; 跨域&#xff1a;指的是浏览器不能执行其他网站的脚本。它是由浏览器的同源策略造成的&#xff0c;是浏览器对javascript施加的安全限制。浏览器的同源策略是指协议&#xff0c;域名…

三维模型OSGB格式轻量化纹理压缩关键技术分析

三维模型OSGB格式轻量化纹理压缩关键技术分析 在三维模型应用中&#xff0c;纹理是一个十分重要的因素&#xff0c;可以使得模型更加真实、精细。随着移动设备和网络传输速度的限制&#xff0c;纹理数据也需要进行轻量化处理&#xff0c;而OSGB格式纹理压缩是一种常见且有效的技…

Android Studio使用OkHttp网络通信出现okio问题

最近使用Android Studio做个App&#xff0c;一开始集成的httpclient&#xff0c;但是Android对其支持性不是很好了&#xff0c;所以换成了okhttp。 集成okhttp 1、我在本地仓库&#xff0c;拷贝出来&#xff0c;放到系统的libs目录下。 2、选定okhttp-4.1.0.jar&#xff0c;右…

电视机看板大屏适配问题——js基础积累

直接上效果图&#xff1a; 下面直接写代码&#xff1a; 1.html部分的代码 <body><div class"container"><!-- 数据展示区域 --><div class"box"><div class"top">top</div><div class"bottom&…

Linux知识点 -- Linux多线程(一)

Linux知识点 – Linux多线程&#xff08;一&#xff09; 文章目录 Linux知识点 -- Linux多线程&#xff08;一&#xff09;一、理解线程1.从资源角度理解线程2.执行流3.多线程编程4.线程的资源5.线程切换的成本更低6.线程的优缺点7.线程异常 二、线程控制1.clone函数2.线程异常…

深度学习入门-2-开源开放平台

一、深度学习框架 1.简介 深度学习在很多机器学习任务中都有着非常出色的表现&#xff0c;在图像识别、语音识别、自然语言处理、机器人、网络广告投放、医学自动诊断和金融等领域都有着广泛应用。面对繁多的应用场景&#xff0c;深度学习框架有助于建模者聚焦业务场景和模型…

AutoHotkey:定时删除目录下指定分钟以前的文件,带UI界面

删除指定目录下&#xff0c;所有在某个指定分钟以前的文件&#xff0c;可以用来清理经常生成很多文件的目录&#xff0c;但又需要保留最新的一部分文件 支持拖放目录到界面 应用场景比如&#xff1a;游戏定时存档&#xff0c;日志目录、监控文件目录等 关于这个删除后备份&am…

Spring Cloud Feign MultipartFile文件上传踩坑之路(包含前端文件上传请求、后端文件保存到aliyun-oss文件服务器)

Spring Cloud Feign MultipartFile文件上传踩坑之路总结 一、前端文件上传 文件上传组件用的是ant-design的a-upload组件&#xff0c;我的界面如下所示&#xff1a; 文件上传请求API: FileUtils.js import axios from "axios"const uploadApi ({file, URL, onUp…

NLP序列标注问题,样本不均衡怎么解决?

【学而不思则罔&#xff0c;思而不学则殆】 1.问题 NLP序列标注问题&#xff0c;样本不均衡怎么解决&#xff1f; 2.解释 以命名实体识别&#xff08;NER&#xff09;为例&#xff0c;这个样本不均衡有两种解释&#xff1a; &#xff08;1&#xff09;实体间类别数量不均衡…

从外部访问K8s中Pod的五种方式

hostNetwork、 hostPort、 NodePort、 LoadBalancer、 Ingress 暴露Pod与Service一样&#xff0c;因为Pod就是Service的backend 1、hostNetwork&#xff1a;true 这是一种直接定义 Pod 网络的方式。 如果在 Pod 中使用 hostNetwork:true 配置&#xff0c; pod 中运行的应用程序…

[计算机入门] 设置屏幕分辨率

3.1 设置屏幕分辨率 3.1.1 屏幕分辨率介绍 屏幕分辨率&#xff0c;是指屏幕上显示的像素个数&#xff0c;它决定了显示图像的清晰度和精细度。屏幕分辨率通常以水平像素数和垂直像素数来衡量&#xff0c;例如1600 x 1200。 在相同大小的屏幕上&#xff0c;当屏幕分辨率较低时…

vue中的路由缓存和解决方案

路由缓存的原因 解决方法 推荐方案二&#xff0c;使用钩子函数beforeRouteUpdate&#xff0c;每次路由更新前执行

[自学记录06|*百人计划]Gamma矫正与线性工作流

一、前言 Gamma矫正其实也属于我前面落下的一块内容&#xff0c;打算把它补上&#xff0c;其它的没补是因为我之前写的GAMES101笔记里已经涵盖了&#xff0c;而Gamma矫正在101里面确实没提到&#xff0c;于是打算把它补上&#xff0c;这块内容并不难&#xff0c;但是想通透的理…

什么是CSS预处理器?请列举几个常见的CSS预处理器。

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ CSS预处理器是什么&#xff1f;⭐ 常见的CSS预处理器⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是…

FPGA_学习_14_第一个自写模块的感悟和ila在线调试教程与技巧(寻找APD的击穿偏压)

前一篇博客我们提到了&#xff0c;如果要使用算法找到Vbr&#xff0c;通过寻找APD采集信号的噪声方差的剧变点去寻找Vbr是一个不错的方式。此功能的第一步是在FPGA中实现方差的计算&#xff0c;这个我们已经在上一篇博客中实现了。 继上一篇博客之后&#xff0c;感觉过了很久了…

这场大学生竞赛中,上百支队伍与合合信息用AI共克难题

目录 0 校企联合共克难题1 北京林业大学&#xff1a;文档格式转换2 浙江中医药大学&#xff1a;个性化题库3 中南林业科技大学&#xff1a;交互场景挖掘4 重庆邮电大学&#xff1a;大模型赋能智能文档5 总结 0 校企联合共克难题 近日&#xff0c;中国大学生服务外包创新创业大…

如何使用CSS实现一个响应式视频播放器?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 使用CSS实现响应式视频播放器⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些对Web开发感兴趣…

avue多选列表根据后端返回的某个值去判断是否选中;avue-curd多选回显

效果如上&#xff1a; getSiteList().then(res > {//列表数据this.siteData res.data.datathis.$nextTick(()>{this.siteData.forEach(item>{//业务条件if(item.configid&&item.configid!0&&item.configid>0){//符合条件时调用选中的方法this.$…

一文科普,买股票加杠杆是怎么回事?其利弊表现在哪?

买股票加杠杆是一种投资策略&#xff0c;通过借入资金来增加投资额度&#xff0c;进而放大投资回报。这种策略的利弊主要表现在以下几个方面。 首先&#xff0c;加杠杆的主要利表现在于放大投资回报。借入的资金可以投资更多的股票&#xff0c;当股票价格上涨时&#xff0c;投资…

XDR解决方案三

XDR未来的进化之路 精细化、智能化、个性化和场景化 当前XDR未来发展方向的问题分享了他的洞察和思考。刘庆林认为&#xff0c;精细化、智能化、个性化和场景化将是未来XDR的重要方向。 首先&#xff0c;实现精准防护的唯一路径&#xff0c;就是要从云、管、边、端、人五个维…