微服务开发系列 第七篇:RocketMQ

news2025/1/23 3:27:46

总概

A、技术栈

  • 开发语言:Java 1.8
  • 数据库:MySQL、Redis、MongoDB、Elasticsearch
  • 微服务框架:Spring Cloud Alibaba
  • 微服务网关:Spring Cloud Gateway
  • 服务注册和配置中心:Nacos
  • 分布式事务:Seata
  • 链路追踪框架:Sleuth
  • 服务降级与熔断:Sentinel
  • ORM框架:MyBatis-Plus
  • 分布式任务调度平台:XXL-JOB
  • 消息中间件:RocketMQ
  • 分布式锁:Redisson
  • 权限:OAuth2
  • DevOps:Jenkins、Docker、K8S

B、本节实现目标

  • [mall-order]下单,用RocketMQ消息中间件发送消息,[mall-member]监听消费给用户加积分

一、RocketMQ安装

供参考:

  • 保姆级教程 Windows11下安装RocketMQ

  • RocketMQ基础入门

二、功能描述

用户下单(mall-order服务)后,发送下单事件MQ, mall-member服务监听消费MQ,为用户增加积分,MQ此处的作用是解耦。

三、代码实现

3.1 maven加RocketMQ依赖包

在项目[mall-pom]的pom.xml里加入RocketMQ依赖包

<rocketmq.version>2.2.3</rocketmq.version>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq.version}</version>
</dependency>

3.2 common.yml配置RocketMQ参数

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: group-${spring.profiles.active}
    send-message-timeout: 3000 # 消息发送超时时长,默认3s
    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

common.yml完整配置

spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379a
    password: 123abc
    jedis:
      pool:
        max-active: 500  #连接池的最大数据库连接数。设为0表示无限制
        max-idle: 20   #最大空闲数
        max-wait: -1
        min-idle: 5
    timeout: 1000
    redisson: 
      password: 123abc
      cluster:
        nodeAddresses: ["redis://127.0.0.1:6379"]
      single:
        address: "redis://127.0.0.1:6379"
        database: 0
    
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.100.51:3306/ac_db?serverTimezone=Asia/Shanghai&useUnicode=true&tinyInt1isBit=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true
    username: ac_u
    password: ac_PWD_123

    #hikari数据库连接池
    hikari:
      pool-name: YH_HikariCP
      minimum-idle: 10 #最小空闲连接数量
      idle-timeout: 600000 #空闲连接存活最大时间,默认600000(10分钟)
      maximum-pool-size: 100 #连接池最大连接数,默认是10
      auto-commit: true  #此属性控制从池返回的连接的默认自动提交行为,默认值:true
      max-lifetime: 1800000 #此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
      connection-timeout: 30000 #数据库连接超时时间,默认30秒,即30000
      connection-test-query: SELECT 1

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: group-${spring.profiles.active}
    send-message-timeout: 3000 # 消息发送超时时长,默认3s
    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

3.3 [mall-order]生产者

生产者OrderSender

package com.ac.order.mq.send;

import com.ac.common.qm.MqTopicConstant;
import com.ac.common.qm.msg.MqOrderMsg;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Lazy
@Component
public class OrderSender {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public void asyncSend(MqOrderMsg mqMsg) {
        String payload = JSONObject.toJSONString(mqMsg);

        //Topic+Tag更精准接收消息
        String destination = MqTopicConstant.TOPIC_ORDER + ":" + mqMsg.getAction().getCode();

        rocketMQTemplate.asyncSend(destination, payload, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info(OrderSender.class.getSimpleName() + ",消息发送成功, result: {}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.error(OrderSender.class.getSimpleName() + ",消息发送失败");
                e.printStackTrace();
            }
        });
    }
}

下单发送MQ

@Slf4j
@Service
public class OrderServiceImpl implements OrderService {

    @Resource
    private OrderDao orderDaoImpl;

    @Resource
    private MemberFeignApi memberFeignApi;

    @Resource
    private OrderItemService orderItemServiceImpl;

    @Resource
    private OrderSender orderSender;

    @Override
    public OrderDetailDTO findOrderDetail(Long id) {
        return null;
    }

    @Override
    public IPage<OrderDTO> pageOrder(OrderPageQry qry) {
        return orderDaoImpl.pageOrder(qry);
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Long createOrder(OrderAddVO addVO) {
        Order order = new Order();
        order.setOrderNo(RandomUtil.randomNumbers(8));

        //省略支付流程
        order.setOrderState(OrderStateEnum.PAYED);
        order.setOrderTime(LocalDateTime.now());

        //通过feign取用户信息
        MemberDTO member = memberFeignApi.findMember(addVO.getMemberId());
        order.setMemberId(addVO.getMemberId());
        order.setMemberName(member.getMemberName());
        order.setMobile(member.getMobile());
        orderDaoImpl.save(order);

        BigDecimal discountAmount = new BigDecimal(0.00);
        BigDecimal productAmount = new BigDecimal(0.00);
        //存订单项信息
        for (OrderItemAddVO orderItemAdd : addVO.getOrderItemList()) {
            OrderItem orderItem = orderItemServiceImpl.addOrderItem(order.getId(), orderItemAdd);
            productAmount = productAmount.add(orderItem.getBuyPrice().multiply(new BigDecimal(orderItem.getBuyNum())));
        }

        //更新订单金额信息
        order.setDiscountAmount(discountAmount);
        order.setProductAmount(productAmount);
        BigDecimal payAmount = productAmount.subtract(discountAmount);
        order.setPayAmount(payAmount);
        orderDaoImpl.updateById(order);

        //发送下单MQ
        MqOrderMsg mqMsg = MqOrderMsg.builder()
                .action(MqOrderAction.PAID)
                .orderId(order.getId())
                .memberId(order.getMemberId())
                .payAmount(order.getPayAmount())
                .build();
        orderSender.asyncSend(mqMsg);

        return order.getId();
    }
}

3.4 [mall-member]消费者

MemberOrderListener消费者

package com.ac.member.mq.listener;

import com.ac.common.qm.MqTopicConstant;
import com.ac.common.qm.MqConsumerConstant;
import com.ac.common.qm.msg.MqOrderAction;
import com.ac.common.qm.msg.MqOrderMsg;
import com.ac.member.component.MemberIntegralComponent;
import com.ac.member.enums.IntegralSourceTypeEnum;
import com.ac.member.vo.IntegralLogEditVO;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Slf4j
@Service
@RocketMQMessageListener(
        consumerGroup = MqConsumerConstant.CONSUMER_MEMBER_ORDER,
        topic = MqTopicConstant.TOPIC_ORDER,
        selectorExpression = "PAID||REFUND",
        messageModel = MessageModel.CLUSTERING)
public class MemberOrderListener implements RocketMQListener<MessageExt> {

    @Resource
    private MemberIntegralComponent memberIntegralComponent;

    @Override
    public void onMessage(MessageExt message) {
        MqOrderMsg mqMsg = JSONObject.parseObject(message.getBody(), MqOrderMsg.class);
        log.info(MemberOrderListener.class.getSimpleName() + ",msgId={},msg={}", message.getMsgId(), mqMsg);
        try {
            //Topic+Tag更精准接收消息
            MqOrderAction action = mqMsg.getAction();
            if (MqOrderAction.PAID == action) {
                dealPaid(mqMsg);
            } else if (MqOrderAction.REFUND == action) {
                dealRefund(mqMsg);
            }
        } catch (Exception e) {
            log.error(MemberOrderListener.class.getSimpleName() + ",消费失败,mqMsg={},e={}", mqMsg, e.getMessage());
        }
    }

    /**
     * 处理订单付款事件
     *
     * @param mqMsg
     */
    private void dealPaid(MqOrderMsg mqMsg) {
        IntegralLogEditVO integralVO = new IntegralLogEditVO();
        integralVO.setMemberId(mqMsg.getMemberId());
        integralVO.setSourceType(IntegralSourceTypeEnum.AWARD_ORDER);
        integralVO.setSourceRemark("下单获得积分");
        integralVO.setIntegral(mqMsg.getPayAmount().longValue());

        memberIntegralComponent.recordIntegral(integralVO);
    }

    private void dealRefund(MqOrderMsg mqMsg) {
        log.info("处理退单事件");
    }
}

四、测试

4.1 下单

下单

4.2 控制台日志

[mall-order]控制台MQ发送日志:

2023-04-04 15:58:37.052  INFO 25204 --- [ublicExecutor_1] com.ac.order.mq.send.OrderSender         : OrderSender,消息发送成功, result: SendResult [sendStatus=SEND_OK, msgId=7F000001627418B4AAC212E0B7F30000, offsetMsgId=AC100B8D00002A9F000000000003B369, messageQueue=MessageQueue [topic=TOPIC_ORDER, brokerName=LAPTOP-R0R80SCR, queueId=3], queueOffset=0]

[mall-member]控制台MQ接收日志:

2023-04-04 15:58:37.243  INFO 26788 --- [_MEMBER_ORDER_1] c.a.m.mq.listener.MemberOrderListener    : MemberOrderListener,msgId=7F000001627418B4AAC212E0B7F30000,msg=MqOrderMsg(action=PAID, orderId=281635594240001, memberId=264260572479489, payAmount=40.50)

4.3 数据库记录

t_order t_member_integral

t_member_integral_log

4.4 RocketMQ Dashboard

Dashboard列表

Dashboard消息内容

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/570603.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为OD机试真题 Java 实现【打印文件】【2023Q1 100分】

一、题目描述 有 5 台打印机打印文件&#xff0c;每台打印机有自己的待打印队列。 因为打印的文件内容有轻重缓急之分&#xff0c;所以队列中的文件有1~10不同的优先级&#xff0c;其中数字越大优先级越高。 打印机会从自己的待打印队列中选择优先级最高的文件来打印。 如果…

Linux常用命令——help命令

在线Linux命令查询工具 help 显示帮助信息 补充说明 help命令用于显示shell内部命令的帮助信息。help命令只能显示shell内部的命令帮助信息。而对于外部命令的帮助信息只能使用man或者info命令查看。 语法 help(选项)(参数)选项 -s&#xff1a;输出短格式的帮助信息。仅…

冯斌:突破认知临界点,打造自驱型团队 | 开发者说

熟悉冯斌的人&#xff0c;大都直接称呼其网名 Kid&#xff0c;包括他在 ONES 的同事。人如其名&#xff0c;Kid 的寓意就是「用孩子的眼光看世界」&#xff0c;返璞归真的思维方式才能发现新大陆。正如毕加索说的&#xff1a;「我一生都在向孩子学习。」 在 ONES 联合创始人兼 …

设计师解放双手之作!3秒生成风景园林效果图,AIGC赋能景观设计

‍ 项目简介 在过去几十年&#xff0c;风景园林经历了从“刀耕火种”的完全手绘设计时代到当下比较流行的参数化设计时代&#xff0c;过去的每一轮技术革新都让风景园林作品的表现形式产生了巨大的改变。随着计算机图像技术的发展&#xff0c;我们有更多的建模和渲染软件辅助提…

Python中深拷贝与浅拷贝介绍

用赋值的时候的一些注意事项 a "< meta name“Keywords” content“小博测试成长之路” />" b a c bprint(id("< meta name“Keywords” content“小博测试成长之路” />")) print(id(a)) print(id(b)) print(id(c)) print(a is b) # a和b…

面试题百日百刷-java基础篇(九)

锁屏面试题百日百刷&#xff0c;每个工作日坚持更新面试题。请看到最后就能获取你想要的,接下来的是今日的面试题&#xff1a; 1.Java 中IO 流分为几种? 按照流的流向分&#xff0c;可以分为输入流和输出流&#xff1b; 按照操作单元划分&#xff0c;可以划分为字节流和字符…

BGP 基础知识学习笔记

今天海翎光电的小编为大家介绍一下BGP的相关基础知识&#xff0c;文章浅显易懂&#xff0c;适合对BGP完全没有了解的同学。 BGP&#xff08;边界网关协议&#xff09;是将互联网联合在一起的路由协议&#xff0c;海翎光电的小编将解释在哪些情况下我们需要 BGP 以及它是如何工作…

为什么你工作很努力却没有晋升?分析晋升的关键点!

见字如面&#xff0c;我是军哥&#xff01; 昨天有一位二线大厂的程序员读者和我吐槽&#xff0c;说马上公司一波人晋升&#xff0c;可是晋升名额并么有他&#xff0c;但是他在这家公司已经干了三年了&#xff0c;工作也很努力&#xff0c;996 更是家常便饭&#xff0c;难道是大…

C语言小游戏的实现——扫雷(使用C语言基础语法)

前言 结合前边我们所学的C语言知识&#xff0c;本期我们将使用C语言实现一个简单的小游戏——扫雷 目录 前言 总体框架设计 多文件分装程序 各功能模块化实现 初始化棋盘 棋盘打印 埋雷 判赢与排雷 游戏逻辑安排 总结 总体框架设计 和三子棋相同&#xff0c;游戏开始时…

109.(cesium篇)cesium椎体上下跳动+旋转

地图之家总目录(订阅之前请先查看该博客) 地图之家:cesium+leaflet+echart+地图数据+地图工具等相关内容的介绍 文章末尾处提供保证可运行完整代码包,运行如有问题,可“私信”博主。 效果如下所示: 下面献上完整代码,代码重要位置会做相应解释 <html lang="en…

python+django高校疫情防控管理系统vue

随着信息化时代的到来&#xff0c;管理系统都趋向于智能化、系统化&#xff0c;高校疫情防控管理系统也不例外&#xff0c;但目前国内的有些学校仍都使用人工管理&#xff0c;学校规模越来越大&#xff0c;同时信息量也越来越庞大&#xff0c;人工管理显然已无法应对时代的变化…

屏幕挂灯是不是智商税?明基ScreenBar Halo屏幕挂灯初体验

目录 一、屏幕挂灯是不是智商税&#xff1f;二、文心一言眼里的屏幕挂灯1、明基ScreenBar Halo屏幕挂灯2、屏幕挂灯和普通台灯哪个好&#xff1f; 三、屏幕挂灯初体验四、使用体验五、无线控制器六、专业角度分析1、屏幕工作照明&#xff0c;不是随便一盏灯就可以2、引导光线照…

记录--超长溢出头部省略打点,坑这么大,技巧这么多?

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 在业务中&#xff0c;有这么一种场景&#xff0c;表格下的某一列 ID 值&#xff0c;文本超长了&#xff0c;正常而言会是这样&#xff1a; 通常&#xff0c;这种情况都需要超长省略溢出打点&#xff0…

2023网络安全工程师面试宝典(附答案)

2023年即将过去一半&#xff0c;先来灵魂三连问&#xff1a; 年初定的目标完成多少了&#xff1f;薪资涨了吗&#xff1f;女朋友找到了吗&#xff1f; ​好了&#xff0c;不扎大家的心了&#xff0c;接下来进入正文。 1、SQL注入的原理是什么&#xff1f; SQL注入攻击是通过将…

1722_PolySpace Bug Finder的几种启动方式

全部学习汇总&#xff1a; GreyZhang/g_matlab: MATLAB once used to be my daily tool. After many years when I go back and read my old learning notes I felt maybe I still need it in the future. So, start this repo to keep some of my old learning notes servral …

【算法】使用数位算法生成0至某个数之间的整数(for循环之外的另一种实现方式,蛮长见识的)

导入&#xff1a; 对某个整数进行遍历&#xff0c;按常规的编程思维都是 for(int i0;i<number;i){} 但是如果这个数比较大&#xff0c;大到无法的话&#xff0c;可能使用普通for循环方式进行遍历就有些吃力了。 那么针对这个问题&#xff0c;可以考虑深度搜索算法dfs来辅助完…

Linux--ServerProgramming--(1)TCP\IP协议族

1.TCP/IP 协议族 1.1 TCP/IP协议族及主要协议 TCP/IP 协议族是一个四层协议系统。自上而下为&#xff08;如下图所示&#xff09;&#xff1a;应用层传输层网络层数据链路层 应用层负责处理应用程序逻辑&#xff0c;在用户空间实现。&#xff08;少数服务器程序在内核中实现。…

快速上手kettle(一)壶之简介

Linux核心命令系列文章目录 快速上手kettle&#xff08;一&#xff09;&#xff1a;壶之简介 快速上手kettle&#xff08;二&#xff09;&#xff1a;Kettle初体验&#xff08;博主正在玩命更新中&#xff09; 快速上手kettle&#xff08;三&#xff09;&#xff1a;Kettle转换…

艾迪普发布新一代国产化“3D引擎+工具+平台”,加速释放数字内容生产力

‍数据智能产业创新服务媒体 ——聚焦数智 改变商业 艾迪普的2023新产品发布会在5月20日在北京隆重举行&#xff0c;该发布会以"向新出发 智见未来"为主题&#xff0c;艾迪普重磅推出了新一代实时三维图形图像引擎IDP Engine 4.0、iVis无代码编程数字孪生应用开发工…

谷歌云开启GPU算力狂飙,驱动AIGC时代加速到来

‍数据智能产业创新服务媒体 ——聚焦数智 改变商业 随着人工智能的飞速发展&#xff0c;尤其是大型AI模型、AIGC的崛起&#xff0c;对AI算力产生了巨大的需求。以GPU为核心的算力供给&#xff0c;已经成为大模型、AIGC乃至整个智能产业发展的关键基础设施。因此&#xff0c;对…