549、RocketMQ详细入门教程系列 -【消息队列之 RocketMQ(三)】 2023.02.28

news2025/3/1 9:43:05

目录

    • 一、Spring 整合 RocketMQ
      • 1.1 消息生产者
      • 1.2 消息消费者
      • 1.3 Spring 配置文件
      • 1.4 运行实例程序
    • 二、参考链接

一、Spring 整合 RocketMQ

不同于 RabbitMQ、ActiveMQ、Kafka 等消息中间件,Spring 社区已经通过多种方式提供了对这些中间件产品集成,例如通过 spring-jms 整合 ActiveMQ、通过 Spring AMQP 项目下的 spring-rabbit 整合 RabbitMQ、通过 spring-kafka 整合 kafka ,通过他们可以在 Spring 项目中更方便使用其 API 。目前在 Spring 框架中集成 RocketMQ 有三种方式,一是将消息生产者和消费者定义成 bean 对象交由 Spring 容器管理,二是使用 RocketMQ 社区的外部项目 rocketmq-jms(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms)然后通过 spring-jms 方式集成使用,三是如果你的应用是基于 spring-boot 的,可以使用 RocketMQ 的外部项目 rocketmq-spring-boot-starter(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter)比较方便的收发消息。
总的来讲 rocketmq-jms 项目实现了 JMS 1.1 规范的部分内容,目前支持 JMS 中的发布/订阅模型收发消息。rocketmq-spring-boot-starter 项目目前已经支持同步发送、异步发送、单向发送、顺序消费、并行消费、集群消费、广播消费等特性,如果比较喜欢 Spring Boot 这种全家桶的快速开发框架并且现有特性已满足业务要求可以使用该项目。当然从 API 使用上最灵活的还是第一种方式,下面以第一种方式为例简单看下Spring 如何集成 RocketMQ 的。

1.1 消息生产者

package org.study.mq.rocketMQ.spring;

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

public class SpringProducer {

    private Logger logger = Logger.getLogger(getClass());

    private String producerGroupName;

    private String nameServerAddr;

    private DefaultMQProducer producer;

    public SpringProducer(String producerGroupName, String nameServerAddr) {
        this.producerGroupName = producerGroupName;
        this.nameServerAddr = nameServerAddr;
    }

    public void init() throws Exception {
        logger.info("开始启动消息生产者服务...");

        //创建一个消息生产者,并设置一个消息生产者组
        producer = new DefaultMQProducer(producerGroupName);
        //指定 NameServer 地址
        producer.setNamesrvAddr(nameServerAddr);
        //初始化 SpringProducer,整个应用生命周期内只需要初始化一次
        producer.start();

        logger.info("消息生产者服务启动成功.");
    }

    public void destroy() {
        logger.info("开始关闭消息生产者服务...");

        producer.shutdown();

        logger.info("消息生产者服务已关闭.");
    }

    public DefaultMQProducer getProducer() {
        return producer;
    }
}

消息生产者就是把生产者 DefaultMQProducer 对象的生命周期分成构造函数、init、destroy 三个方法,构造函数中将生产者组名、NameServer 地址作为变量由 Spring 容器在配置时提供,init 方法中实例化 DefaultMQProducer 对象、设置 NameServer 地址、初始化生产者对象,destroy 方法用于生产者对象销毁时清理资源。

1.2 消息消费者

package org.study.mq.rocketMQ.spring;

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

public class SpringConsumer {

    private Logger logger = Logger.getLogger(getClass());

    private String consumerGroupName;

    private String nameServerAddr;

    private String topicName;

    private DefaultMQPushConsumer consumer;

    private MessageListenerConcurrently messageListener;

    public SpringConsumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener) {
        this.consumerGroupName = consumerGroupName;
        this.nameServerAddr = nameServerAddr;
        this.topicName = topicName;
        this.messageListener = messageListener;
    }


    public void init() throws Exception {
        logger.info("开始启动消息消费者服务...");

        //创建一个消息消费者,并设置一个消息消费者组
        consumer = new DefaultMQPushConsumer(consumerGroupName);
        //指定 NameServer 地址
        consumer.setNamesrvAddr(nameServerAddr);
        //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //订阅指定 Topic 下的所有消息
        consumer.subscribe(topicName, "*");

        //注册消息监听器
        consumer.registerMessageListener(messageListener);

        // 消费者对象在使用之前必须要调用 start 初始化
        consumer.start();

        logger.info("消息消费者服务启动成功.");
    }

    public void destroy(){
        logger.info("开始关闭消息消费者服务...");

        consumer.shutdown();

        logger.info("消息消费者服务已关闭.");
    }

    public DefaultMQPushConsumer getConsumer() {
        return consumer;
    }

}

同消息生产者类似,消息消费者是把生产者 DefaultMQPushConsumer 对象的生命周期分成构造函数、init、destroy 三个方法,具体含义在介绍 Java 访问 RocketMQ 实例时已经介绍过了,不再赘述。当然,有了消费者对象还需要消息监听器在接收到消息后执行具体的处理逻辑。

package org.study.mq.rocketMQ.spring;

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class MessageListener implements MessageListenerConcurrently {

    private Logger logger = Logger.getLogger(getClass());

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (list != null) {
            for (MessageExt ext : list) {
                try {
                    logger.info("监听到消息 : " + new String(ext.getBody(), "UTF-8"));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}

消息监听器类就是把前面 Java 示例中注册消息监听器时声明的匿名内部类代码抽取出来定义成单独一个类而已。

1.3 Spring 配置文件

因为只使用 Spring 框架集成,所以除了 Sping 框架核心 jar 包外不需要额外添加依赖包了。本例中将消息生产者和消息消费者分成两个配置文件,这样能更好的演示收发消息的效果。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
        
    <bean id="producer" class="org.study.mq.rocketMQ.spring.SpringProducer" init-method="init" destroy-method="destroy">
        <constructor-arg name="nameServerAddr" value="localhost:9876"/>
        <constructor-arg name="producerGroupName" value="spring_producer_group"/>
    </bean>
</beans>

消息生产者配置很简单,定义了一个消息生产者对象,该对象初始化时调用 init 方法,对象销毁前执行 destroy 方法,将 Name Server 地址和生产者组配置好。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">

    <bean id="messageListener" class="org.study.mq.rocketMQ.spring.MessageListener" />

    <bean id="consumer" class="org.study.mq.rocketMQ.spring.SpringConsumer" init-method="init" destroy-method="destroy">
        <constructor-arg name="nameServerAddr" value="localhost:9876"/>
        <constructor-arg name="consumerGroupName" value="spring_consumer_group"/>
        <constructor-arg name="topicName" value="spring-rocketMQ-topic" />
        <constructor-arg name="messageListener" ref="messageListener" />
    </bean>

</beans>

消息消费者同消息生产者配置类似,多了一个消息监听器对象的定义和绑定。

1.4 运行实例程序

按前述步骤 启动 Name Server 和 Broker,接着运行消息生产者和消息消费者程序,简化起见我们用两个单元测试类模拟这两个程序:

package org.study.mq.rocketMQ.spring;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringProducerTest {

    private ApplicationContext container;

    @Before
    public void setup() {
        container = new ClassPathXmlApplicationContext("classpath:spring-producer.xml");
    }

    @Test
    public void sendMessage() throws Exception {
        SpringProducer producer = container.getBean(SpringProducer.class);

        for (int i = 0; i < 20; i++) {
            //创建一条消息对象,指定其主题、标签和消息内容
            Message msg = new Message(
                    "spring-rocketMQ-topic",
                    null,
                    ("Spring RocketMQ demo " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
            );

            //发送消息并返回结果
            SendResult sendResult = producer.getProducer().send(msg);

            System.out.printf("%s%n", sendResult);
        }
    }
}

SpringProducerTest 类模拟消息生产者发送消息。

package org.study.mq.rocketMQ.spring;

import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringConsumerTest {

    private ApplicationContext container;

    @Before
    public void setup() {
        container = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml");
    }

    @Test
    public void consume() throws Exception {
        SpringConsumer consumer = container.getBean(SpringConsumer.class);

        Thread.sleep(200 * 1000);

        consumer.destroy();
    }
}

SpringConsumerTest 类模拟消息消费者者接收消息,在 consume 方法返回之前需要让当前线程睡眠一段时间,使消费者程序继续存活才能监听到生产者发送的消息。

分别运行 SpringProducerTest 类 和 SpringConsumerTest 类,在 SpringConsumerTest 的控制台能看到接收的消息:
在这里插入图片描述
假如启动两个 SpringConsumerTest 类进程,因为它们属于同一消费者组,在 SpringConsumerTest 的控制台能看到它们均摊到了消息:
在这里插入图片描述
在这里插入图片描述

二、参考链接

[01] 消息队列之 RocketMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/379007.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WebRTC Opus编码器的创建与参数细节分析( sdp -> native )

这几天在做一些WebRTC音频改进方面的调查工作&#xff0c;在阅读Chromium源码的过程中&#xff0c;就顺便记录下来&#xff0c;便于日后回顾。本文基于Chromium 85源码分析&#xff0c;由于Chromium的快速发展&#xff0c;很有可能不适合于跨度太大的Chromium版本。大家知道Opu…

QT学习14:QtXlsx操作Excel表

一、前言操作excel方式有&#xff1a;QAxObject 和QtXlsx区别&#xff1a;Qt自带的QAxObject库操作excel的前提是电脑已经安装微软的Office&#xff08;包含EXCEL&#xff09;&#xff0c;而QtXlsx可以直接使用免装Office且操作更简单。二、QtXlsx操作示例参考&#xff1a;http…

C筑基——深入理解内存对齐

目录1 前言2 正文2.1 为什么要有内存对齐&#xff1f;2.2 内存对齐原则2.2.1 基本数据类型是自然对齐的2.2.2 包含基本数据类型成员的结构体套用结构体内存对齐原则来分析使用 gdb 查看这两个结构体的成员内存位置结构体类型变量是自然对齐的吗&#xff1f;2.2.3 数组类型2.3 修…

今天 4 点,龙蜥自动化运维平台SysOM 2.0的诊断中心功能介绍 | 第 66-68 期

本周 3 期「龙蜥大讲堂」预告来啦&#xff01;我们邀请了系统运维 SIG Contributor 阙建明分享《SysOM 2.0 诊断中心功能介绍》&#xff0c;龙蜥社区云原生机密计算 SIG Maintainer、Intel 高级云计算软件工程师黄晓军分享《Intel HE Toolkit 介绍》主题演讲&#xff0c;龙蜥社…

【Linux】操作系统与Linux — Linux概述、组成及目录结构

目录 一、什么是操作系统&#xff1f;都有那些&#xff1f; 二、Linux概述 三、Linux组成 三、Linux目录结构 四、Linux目录结构 &#x1f49f; 创作不易&#xff0c;不妨点赞&#x1f49a;评论❤️收藏&#x1f499;一下 一、什么是操作系统&#xff1f;都有那些&#x…

频率信号转电压或电流信号隔离变送器0-1KHz /0-5KHz /0-10KHz转0-2.5V/0-5V/0-20mA

主要特性:>> 精度等级&#xff1a;0.2级>> 全量程内极高的线性度&#xff08;非线性度<0.1%&#xff09; >> 辅助电源/信号输入/信号输出&#xff1a; 2500VDC 三隔离>> 辅助电源&#xff1a;5VDC&#xff0c;12VDC&#xff0c;24VDC等单电源供电&g…

2020蓝桥杯真题约数个数(填空题) C语言/C++

题目描述 本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 1200000 有多少个约数&#xff08;只计算正约数&#xff09;。 运行限制 最大运行时间&#xff1a;1s 最大运行内存: 128M 所需变量 int a 1200000;//初始最大数 i…

模式识别 —— 第一章 贝叶斯决策理论

模式识别 —— 第一章 贝叶斯决策理论 前言 新的学期开始了&#xff0c;当然是要给不爱吃香菜的月亮记录学习笔记呀~ 没多久了&#xff0c;待夏花绚烂之时~人山人海&#xff0c;我们如约而至&#xff01; 以后清河海风 溶溶月色 共赏之人 就在身侧 mua~ 文章目录模式识别 —…

【服务器数据恢复】HP EVA存储lun丢失的数据恢复案例

服务器故障&检测&分析&#xff1a; 某品牌EVA存储设备中的RAID5磁盘有两块硬盘掉线&#xff0c;lun丢失。硬件工程师对故障服务器进行物理故障检测&#xff0c;发现掉线硬盘能够正常读取&#xff0c;无物理故障&#xff0c;也没有发现坏道。 故障服务器掉线硬盘没有物理…

深度理解Redux原理并实现一个redux

Redux的作用是什么 Redux的作用在于实现状态传递、状态管理。在这里你可能会说了&#xff0c;如果是状态传递&#xff0c;那我props的传递不也是可以达到这样的效果吗&#xff1f;context上下文方案不也是可以达到这样的效果吗&#xff1f;没错&#xff0c;是这样的&#xff0…

汇川SV660N与基恩士 KV7500 控制器调试说明

1. 伺服相关部分配置 1.1 伺服相关版本 SV660N 试机建议使用“SV660N-Ecat_v0.09.xml”及以上设备描述文件。 SV660N 单板软件版本建议为“H0100901.4”及更高版本号。 1.2 相关参数说明 SV660N 对象字典中 60FD 的含义较 IS620N 有所更改&#xff1a;bit0、1、2 分别为负限位…

移动字母--降维与DFS

一、题目描述 2x3=6 个方格中放入 ABCDE 五个字母,右下角的那个格空着。如下图所示。 和空格子相邻的格子中的字母可以移动到空格中,比如,图中的 C 和 E 就可以移动,移动后的局面分别是: A B D E C A B C D E 为了表示方便,我们把 6 个格子中字母配置用一个串表示出…

如何创建出实用的员工手册?

员工手册主要是企业内部的人事制度管理规范&#xff0c;包含企业规章制度和企业文化&#xff0c;同时还起到了展示企业形象、传播企业文化的作用。它既覆盖了企业人力资源管理的各个方面规章制度的主要内容&#xff0c;又因适应企业独特个性的经营发展需要而弥补了规章制度制定…

【HTML】列表结构

列表结构HTML效果HTML <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdev…

HIVE --- zeppelin安装

目录 把zeppelin压缩包拷贝到虚拟机里面 解压 改名 修改配置文件 编辑zeppelin-site.xml—将配置文件的ip地址和端口号进行修改 编辑 zeppelin-env.sh—添加JDK和Hadoop环境 配置环境变量 刷新环境变量 拷贝Hive文件 拷贝外部文件 启动zeppelin 启动Hadoop&Hi…

Web API接口鉴权方式

一、什么是鉴权&#xff1f;为什么要鉴权 鉴权&#xff08;authentication&#xff09;&#xff0c;也叫做认证&#xff0c;即验证用户是否拥有访问系统的权利。 HTTP本身是无状态的请求&#xff0c;每次请求都是一次性的&#xff0c;并不会知道请求前后发生了什么。但在很多…

记一次linux服务器磁盘空间占满的问题排查

问题&#xff1a;服务器安装后两天&#xff0c;发现磁盘空间使用满了【date: write error: No space left on device】问题排查&#xff1a;1、使用df -hl命令查看2、使用du -hl --max-depth1&#xff0c;从根目录开始查起&#xff0c;最后发现&#xff0c;磁盘的空间全部被/va…

自学5个月Java找到了9K的工作,我的方式值得大家借鉴 第二部分

我的学习心得&#xff0c;我认为能不能自学成功的要素有两点。 第一点就是自身的问题&#xff0c;虽然想要转行学习Java的人很多&#xff0c;但是非常强烈的想要转行学好的人是小部分。而大部分人只是抱着试试的心态来学习Java&#xff0c;这是完全不可能的。所以能不能学成Jav…

【Linux】项目的自动化构建-make/makefile

&#x1f4a3;1.背景会不会写makefile&#xff0c;从一个侧面说明了一个人是否具备完成大型工程的能力 一个工程中的源文件不计数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;makefile定义了一系列的 规则来指定&#xff0c;哪些文件需要先编译&#xff…

Java List系列(ArrayList、LinekdList 以及遍历中删除重复元素时发生的异常和解决办法)

目录List集合系列List系列集合特点List集合特有方法List集合的遍历方式ArrayList集合的底层原理分析源码LinkedList集合的底层原理集合的并发修改异常问题&#xff08;删除重复元素时&#xff09;List集合系列 List系列集合特点 ArrayList、LinekdList &#xff1a;有序&#…