RocketMq详解:三、RocketMq通用生产和消费方法改造

news2024/11/24 4:32:04

文章目录

  • 1.背景
  • 2.通用方法改造
    • 2.1添加maven依赖
    • 2.2 RocketMq基础配置
    • 2.3 配置类
    • 2.5 消息传输的对象和结果
    • 2.4 消息生产者
    • 2.5 消息消费者
    • 2.6 功能测试

1.背景

在第二章:《RocketMq详解:二、SpringBoot集成RocketMq》中我们已经实现了消费基本生产和消费的实现,但是在真实的开发环境中如果按照这种方式去实现,冗余代码较多,且通过实现RocketMQListeneronMessage的方法去完成消息消费无返回结果,在后期的流程中不易维护,因此,本章将对这些问题进行二次改造和优化。

为了防止新同学从头开始学,本章将如何配置和实现简单在复述一下,至于具体怎么安装RocketMq,本文提供两种安装方法:

  • 《MacOS环境下RocketMQ安装及部署 RocketMQ Dashboard 可视化》
  • 《docker安装rocketMq》

2.通用方法改造

2.1添加maven依赖

		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-spring-boot-starter</artifactId>
			<version>2.2.3</version>
		</dependency>

2.2 RocketMq基础配置

接着,在application.yml或application.properties文件中配置RocketMQ的相关参数,如NameServer地址、生产者组、消费者组等:

rocketmq:
  name-server: 127.0.0.1:9876
  # 生产者
  producer:
    group: boot_group_1
    # 消息发送超时时间
    send-message-timeout: 3000
    # 消息最大长度4M
    max-message-size: 4096
    # 消息发送失败重试次数
    retry-times-when-send-failed: 3
    # 异步消息发送失败重试次数
    retry-times-when-send-async-failed: 2
  # 消费者
  consumer:
    group: boot_group_1
    # 每次提取的最大消息数
    pull-batch-size: 5

上面的配置如果是在分布式环境下也可以配置在Apollonacos等配置中心里进行动态配置

2.3 配置类

在配置类中主要定义两个Bean的加载,即RocketMQTemplateDefaultMQProducer,主要是提供消息发送的能力,即生产消息;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author ninesun
 * @ClassName RocketMqConfig
 * @description: 消息中间件配置类
 * @date 2024年05月19日
 * @version: 1.0
 */
@Configuration
public class RocketMqConfig {

    @Value("${rocketmq.name-server}")
    private String nameServer;

    @Value("${rocketmq.producer.group}")
    private String producerGroup;

    @Value("${rocketmq.producer.send-message-timeout}")
    private Integer sendMsgTimeout;

    @Value("${rocketmq.producer.max-message-size}")
    private Integer maxMessageSize;

    @Value("${rocketmq.producer.retry-times-when-send-failed}")
    private Integer retryTimesWhenSendFailed;

    @Value("${rocketmq.producer.retry-times-when-send-async-failed}")
    private Integer retryTimesWhenSendAsyncFailed;

    @Bean
    public RocketMQTemplate rocketMqTemplate() {
        RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();
        rocketMqTemplate.setProducer(defaultMqProducer());
        return rocketMqTemplate;
    }

    @Bean
    public DefaultMQProducer defaultMqProducer() {
        DefaultMQProducer producer = new DefaultMQProducer();
        producer.setNamesrvAddr(this.nameServer);
        producer.setProducerGroup(this.producerGroup);
        producer.setSendMsgTimeout(this.sendMsgTimeout);
        producer.setMaxMessageSize(this.maxMessageSize);
        producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
        producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);
        return producer;
    }
}

在编写消费者和生产者之前,我们先统一一下消息传输的对象,以及消费的结果

2.5 消息传输的对象和结果

  • 基本传输对象
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageDTO<T> {
    private T data;
    private Integer delayTime;
    @NotBlank(message = "Topic must not be blank")
    private String topic;
    @NotBlank(message = "Key must not be blank")
    private String key;
    private List<String> tags;
    private String messageType;
}
  • 消费结果枚举
public enum MsgRetryStatus {


    SUCCEED(-1), FAILURE(0), RETRY(0), RETRY_1S(1), RETRY_5S(2),
    RETRY_10S(3), RETRY_30S(4), RETRY_1M(5), RETRY_2M(6), RETRY_3M(7),
    RETRY_4M(8), RETRY_5M(9), RETRY_6M(10), RETRY_7M(11), RETRY_8M(12),
    RETRY_9M(13), RETRY_10M(14), RETRY_20M(15), RETRY_30M(16), RETRY_1H(17), RETRY_2H(18),
    RETRY_1D(19), RETRY_3D(20),  RETRY_7D(21), RETRY_14D(22), RETRY_21D(23),  RETRY_28D(24),
    RETRY_35D(25);

    int level;

    MsgRetryStatus(int level) {
        this.level = level;
    }

    public int getLevel() {
        return level;
    }
}

2.4 消息生产者

@Component
public class MessageProduct {
    @Resource
    private RocketMQTemplate rocketMqTemplate;

    public SendResult SendMessage(MessageDTO data) {
        // 创建一个Message对象
        org.springframework.messaging.Message<?> message = MessageBuilder.withPayload(JSON.toJSONString(data))
                .setHeader(RocketMQHeaders.KEYS,data.getKey())
                .setHeader(RocketMQHeaders.TAGS,data.getTags())
                .build();
        return rocketMqTemplate.syncSendDelayTimeSeconds(data.getTopic(), message, data.getDelayTime());
    }
}

2.5 消息消费者

在原始的实现中,我们通过实现RocketMQListener接口中的onMessage方法来完成消息的消费。

然而,这种方式存在一个问题:如果消息消费失败,我们无法获取到返回结果,也不便于进行错误处理和重试。

为了优化这个问题,我们可以使用@RocketMQMessageListener注解的returnTopic属性来指定一个返回主题,当消息消费失败时,将消息发送到这个返回主题中。

同时,我们可以创建一个专门的处理失败消息的消费者来处理这些返回的消息。

另外,我们还可以在onMessage方法中添加异常处理逻辑,以便在消费失败时进行错误处理和记录日志。

新增一个抽象类CommonConsumer去实现RocketMQListener,并提供一个有返回值的doConsumerProcess方法,去实现具体的消费逻辑,具体实现如下:

@Slf4j
@Component
public abstract class CommonConsumer implements RocketMQListener<MessageDTO> {
    public void onMessage(MessageDTO message) {
        log.info("收到延迟消息成功,消息体:{}", message);
        doConsumerProcess(message);
    }

    public abstract MsgRetryStatus doConsumerProcess(MessageDTO messageDTO);
}

以boot-mq-topic为例,我们实现具体的消费,新增一个对象BootMqConsumer继承我们的CommonConsumer,来实现具体的消费逻辑

@Slf4j
@Component
public abstract class CommonConsumer implements RocketMQListener<MessageDTO> {
    public void onMessage(MessageDTO message) {

        try {
            // 处理消息的逻辑
            log.info("收到延迟消息成功,消息体:{}", message);
            MsgRetryStatus msgRetryStatus = doConsumerProcess(message);
            if (MsgRetryStatus.RETRY.equals(msgRetryStatus)
                    || MsgRetryStatus.FAILURE.equals(msgRetryStatus)) {
                //TODO 消费失败或重试 则发送重试Topic
            }
        } catch (Exception e) {
            // 记录错误日志
            e.printStackTrace();
            // 可以选择将失败消息发送到指定Topic
            this.sendReturnMessgae();
        }
    }

    public abstract MsgRetryStatus doConsumerProcess(MessageDTO messageDTO);

    private void doRetrytConsumerProcess() {
        //TODO:待实现,后面补上
    }
}

2.6 功能测试

package com.example.demo.controller;

import com.alibaba.fastjson.JSON;
import com.example.demo.annoation.Idempotent;
import com.example.demo.mq.producer.MessageProduct;
import com.example.demo.po.MessageDTO;
import com.example.demo.po.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.UUID;

@RestController
@Slf4j
@Validated
public class TestController01 {
    @Resource
    private RocketMQTemplate rocketMqTemplate;

    @Resource
    private DefaultMQProducer defaultMqProducer;

    @Resource
    private MessageProduct messageProduct;

    @GetMapping("/send/msg4")
    public String sendMsg4() {
        try {
            User user = User.builder()
                    .id(1)
                    .name("ninesun")
                    .build();
            MessageDTO<User> messageDTO = MessageDTO.<User>builder()
                    .data(user)
                    .delayTime(3)
                    .topic("boot-mq-topic")
                    .key(String.valueOf(UUID.randomUUID()))
                    .build();
            SendResult sendResult = messageProduct.SendMessage(messageDTO);
            log.info("msgId:{},sendStatus:{},data:{}", sendResult.getMsgId(), sendResult.getSendStatus(), JSON.toJSONString(messageDTO));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "OK";
    }
}

附:User对象

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {
    private Integer id;
    private String name;
    private String desc;
}

访问:/send/msg4接口,结果如下:
在这里插入图片描述

至此,我们便可以在真实环境中很容易的去集成消息队列实现功能的解耦,流量的削峰。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2211764.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CISCN2022-cactus

这周在疯狂学kernel pwn。 记录一下这题&#xff0c;race conditonmsg_msgpipe_buffer&#xff0c;kaslrsmepsmapkpti。 漏洞很简单&#xff0c;所有操作都没加锁&#xff0c;就是race condition了。edit什么的都只能2次。 很明显了&#xff0c;一次泄露基址&#xff0c;一次劫…

unidbg console debugger 调试技巧

版权归作者所有&#xff0c;如有转发&#xff0c;请注明文章出处&#xff1a;https://cyrus-studio.github.io/blog/ 打开debug日志 编辑 unidbg-android/src/test/resources/log4j.properties 把 log4j.logger.com.github.unidbg.AbstractEmulator 改为 DEBUG 当运行报错时…

lenovo联想 ThinkPad E14 Gen 2,E15 Gen 2 AMD(20T6,20T7,20T8,20T9)原厂Win10系统镜像下载

适用机型&#xff1a;【20T6、20T7、20T8、20T9】 链接&#xff1a;https://pan.baidu.com/s/1AVTvmiIHjafsFw8P7_jMPg?pwdzux5 提取码&#xff1a;zux5 联想原装WIN系统自带所有驱动、出厂主题壁纸、系统属性联机支持标志、系统属性专属LOGO标志、Office办公软件、联想电脑…

C#实现CRC32算法

CRC32 是一种校验和算法&#xff0c;用于检测消息是否未被修改。 它被广泛使用&#xff1a;例如&#xff0c;计算以太网发送包校验和。 public class CRC32 {private static readonly uint[] Crc32Table new uint[256];static CRC32(){uint i, j;uint crc;for (i 0; i < …

《深度学习》OpenCV 风格迁移、DNN模块 案例解析及实现

目录 一、风格迁移 1、什么是风格迁移 2、步骤 1&#xff09;训练 2&#xff09;迁移 二、DNN模块 1、什么是DNN模块 2、DNN模块特点 1&#xff09;轻量 2&#xff09;外部依赖性低 3&#xff09;方便 4&#xff09;集成 5&#xff09;通用性 3、流程图 4、图像…

软件设计之Redis(1)

软件设计之Redis(1) 路线图推荐&#xff1a; 【Java学习路线-极速版】【Java架构师技术图谱】 尚硅谷Redis零基础到进阶&#xff0c;最强redis7教程&#xff0c;阳哥亲自带练&#xff08;附redis面试题&#xff09; 资料可以去尚硅谷官网免费领取 学习内容&#xff1a; Redi…

Unity3D 观察者模式

Unity3D 泛型事件系统 观察者模式 观察者模式是一种行为设计模式&#xff0c;通过订阅机制&#xff0c;可以让对象触发事件时&#xff0c;通知多个其他对象。 在游戏逻辑中&#xff0c;UI 界面通常会监听一些事件&#xff0c;当数据层发生变化时&#xff0c;通过触发事件&am…

【JavaSE基础】Java 变量

为什么需要变量 变量是程序的基本组成单位 class Test{public static void main(String[] args){int a 1; //定义一个变量&#xff0c;类型为int&#xff0c;变量名为a&#xff0c;并赋值为1int b 3; //定义另一个变量&#xff0c;类型为int&#xff0c;变量名为b&#xff0…

sqli-labs less-25 and/or绕过

来到less-25 我们可以看到下面有提示&#xff0c;Hint: Your Input is Filtered with following result: 说明本关卡有过滤&#xff0c; 构造 http://192.168.140.130/sq/Less-25/?id1’ 页面报错&#xff0c;从报错可以得知闭合方式为,所以 用注释符&#xff0c;发现注释符…

oracle数据坏块处理(一)-通过rman备份修复

表有坏块时&#xff0c;全表查询会报错&#xff1a; 这时候如果有前面正常的rman备份&#xff0c;那么我们就可以通过rman备份直接对数据文件块做恢复 先对数据文件做个逻辑检查&#xff1a; RMAN> backup check logical VALIDATE DATAFILE EXB_DATA/exb/datafile/cuteinf…

公开课 | 2024最新清华大模型公开课 第3课 神经网络与大模型基础 Part 2

本文由readlecture.cn转录总结。ReadLecture专注于音、视频转录与总结&#xff0c;2小时视频&#xff0c;5分钟阅读&#xff0c;加速内容学习与传播。 大纲 神经网络概述 神经网络的概念 神经网络的应用方式 序列建模与神经网络架构 循环神经网络&#xff08;RNN&#xff09;…

UE5模型导入面板解读

1.Skeletal Mesh&#xff1a; 是一个可以让模型动起来的选项&#xff0c;适用于需要动画的角色或生物。是否勾选&#xff1a;如果导入的是一个需要动画的角色或生物&#xff0c;就勾选 Skeletal Mesh 选项&#xff1b;如果是静态物体&#xff0c;就不勾选。 2.Build Nanite&a…

集合类HashMap,HashTable,ConcurrentHashMap区别?

1.HashMap 简单来说&#xff0c;HashMap由数组链表组成的&#xff0c;数组是HashMap的主体&#xff0c;链表则是主要为了解决哈希冲突而存在的&#xff0c;如果定位到的数组位置不含链表&#xff08;当前entry的next指向null&#xff09;,那么对于查找&#xff0c;添加等操作很…

VS中创建QT项目。

一&#xff0c;安装QT&#xff0c; 重点&#xff1a;在安装QT的时候要安装msvc201x版本的组件&#xff0c; 二 &#xff0c; 安装 qt-vs-tools Index of /development_releases/vsaddin/2.8.1 三。安装 win10sdk&#xff0c;这是因为我的当前电脑是win10的&#xff0c; 安装版…

【逗号绕过】

简介 所以为了避免逗号被过滤&#xff0c;我们来看看如何绕过叭 一、From for 绕过 我们直接看一个题目&#xff1a; id1 页面输出hello user id1 and 11%23 页面返回hello user id1 and 11%23 页面不返回数据符合盲注&#xff0c;并且是一个数字型的sql注入&#xff0c;尝…

13.梯度下降法的代码实战——举足轻重的模型优化算法

引言 通过12.梯度下降法的具体解析——举足轻重的模型优化算法-CSDN博客的学习&#xff0c;我们已经了解到了梯度下降法的整体流程与不同分类。归根结底&#xff0c;我们最终是要使用代码实现梯度下降法。 通过阅读本篇博客&#xff0c;你可以&#xff1a; 1.知晓轮次和批次…

Unity URP 如何实现遮挡显示 (全流程教程)

嗨~~&#xff01;&#xff0c;熊猫老师又来了 &#xff0c;这次为大家分享项目中非常实用的一个技术点&#xff1a;遮挡显示。 老规矩&#xff0c;上才艺&#xff1a; 实现原理 &#xff1a;对模型渲染两次。 第一次&#xff1a; 正常渲染物体&#xff0c;深度测试不通过的情况…

【工具】HTTrack:网站一键克隆下载,实现离线浏览与备份的利器

什么是 HTTrack&#xff1f; HTTrack 是一款用于复制完整网站的开源工具&#xff0c;它可以从服务器下载整个网站的内容&#xff0c;包括 HTML 文件、图像、样式表、脚本等资源。通过这种方式&#xff0c;你可以在离线状态下浏览网站&#xff0c;就像在线一样。 HTTrack 支持…

设备台账管理是什么

设备管理对企业至关重要。比如在电子加工企业&#xff0c;高效的设备管理能减少设备故障&#xff0c;提升生产效率&#xff0c;为企业赢得市场竞争优势。设备台账管理作为设备管理的一个核心部分&#xff0c;起着重要的作用。 让我们一起从本篇文章中探索设备台账管理是什么&a…

[STM32] 简单介绍 (一)

文章目录 1.STM32简介2.ARM3.STM32F103ZET6/STM32F103C8T64.STM32命名规则5.STM32最小系统板6.STM32开发方式7.STM32系统架构8.STM32时钟系统9.STM32中断系统10.STM32定时器 1.STM32简介 STM32是ST公司基于ARM Cortex-M内核开发的32位微控制器&#xff1b; STM32常应用在嵌入式…