RocketMQ快速入门:集成spring, springboot实现各类消息消费(七)附带源码

news2025/1/13 15:55:29

0. 引言

rocketmq支持两种消费模式:pull和push,在实际开发中这两种模式分别是如何实现的呢,在spring框架和springboot框架中集成有什么差异?今天我们一起来探究这两个问题。

1. java client实现消息消费

1、添加依赖

		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>

1.1 Push消息消费

rocketmq的push消费是通过pull模式为基础来进行模拟的,就是通过监听器,不断的pull来实现,因此其实现重点就是实现监听器

rocektmq的监听器支持2种:

  • MessageListenerConcurrently 拉取到新消息之后就提交到线程池去消费
  • MessageListenerOrderly 通过加分布式锁和本地锁保证同时只有一条线程去消费一个队列上的数据,以此保证顺序消费

这里虽然还有MessageListener类型,实际上是上述两种的父类,该方法也被弃用了
在这里插入图片描述
所以push模式的的重点就是实现MessageListenerConcurrently监听器,其内部只有一个consumeMessage方法
在这里插入图片描述
那么实现的重点就是consumeMessage方法,这里我们睡眠了10s,用于模拟该监听器运行10s

public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");

        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 集群消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 设置topic
        consumer.subscribe("topic_test", "*");

        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : list) {
                    String topic = msg.getTopic();
                    try {
                        String messageBody = new String(msg.getBody(), "utf-8");
                        System.out.println(topic+":"+messageBody);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者实例
        consumer.start();

        Thread.sleep(10000);
    }

当然,如上的形式只能用于我们单元测试使用,集成在生产中时肯定不能这样用,我们需要将其注册为bean形式,并在项目启动时进行调用,让其注册为监听器

@Component
public class Consumer1PushListener implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt msg : list) {
            String topic = msg.getTopic();
            try {
                String messageBody = new String(msg.getBody(), "utf-8");
                System.out.println(topic+":"+messageBody);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    @PostConstruct
    public void init(){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 集群消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册监听器
        consumer.registerMessageListener(this);
        try{
            // 设置topic
            consumer.subscribe("topic_test", "*");
            // 启动示例
            consumer.start();
        }catch (Exception e){
            e.printStackTrace();
            System.out.println("rocketmq 消费者启动失败");
        }
    }
}

我们启动项目,发送一条消息,会发现消费者可以实时消费

在这里插入图片描述
消息模式如何调整?
rocektmq 有集群模式和广播模式两种消息模式,如果需要调整的话,通过消费者的setMessageModel方法即可调整

// 集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

1.2 Pull消息消费

pull模式的实现更加简单,直接查看pull消费者类DefaultMQPullConsumer,其下有pull方法
在这里插入图片描述
官方给出的示例代码如下:

public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        try {
            MessageQueue mq = new MessageQueue();
            mq.setQueueId(0);
            mq.setTopic("topic_test");
            mq.setBrokerName("Broker");
            long offset = 26;
            PullResult pullResult = consumer.pull(mq, "*", offset, 32);
            if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
                System.out.printf("%s%n", pullResult.getMsgFoundList());
                consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        consumer.shutdown();
    }

但是截止目前,该类已经被弃用了
在这里插入图片描述
更加推荐的是用DefaultLitePullConsumer类实现,其下的poll方法可以帮助我们更加方便的实现消息消费,这里需要注意,两个类,一个是pull,一个是poll,pull实际上是需要指定偏移量的,而poll则自动帮我们更新了偏移量

public static void main(String[] args) throws MQClientException {
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("group2");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("topic_test", "*");
        consumer.start();
        try {
            List<MessageExt> messageList = consumer.poll(3000);
            for (MessageExt message : messageList) {
                System.out.println("pull消费:"+new String(message.getBody()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        consumer.shutdown();
    }

发送几条消息,运行测试
在这里插入图片描述
生产中使用时,大家可以把DefaultLitePullConsumer定义为bean, 以此减少每次资源创建的消耗,具体方式可参考上述push模式的实现代码

1.3 顺序消息消费

rocketmq中提供了两种消费处理形式:并发消费(MessageListenerConcurrently)和顺序消费(MessageListenerOrderly

并发消费消费者会创建多个线程同时消费队列消息,而顺序消费流程跟并发消费最大的区别在于,顺序消费对要处理的队列加锁,确保同一队列,同一时间,只允许一个消费线程处理

我们在之前消息发送的章节已经提前体验过顺序消费代码实现了,通过上述对监听器类型的描述,我们也能知道顺序消费的实现,就是实现MessageListenerOrderly监听器

public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");

        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 集群消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 设置topic
        consumer.subscribe("topic_order", "*");

        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
              @Override
              public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                  byte[] body = list.get(0).getBody();
                  System.out.println("接收消息:"+new String(body, StandardCharsets.UTF_8));
                  return ConsumeOrderlyStatus.SUCCESS;
              }
        });

        // 启动消费者实例
        consumer.start();

        Thread.sleep(10000);
    }

2. springboot实现消息消费

1、添加依赖

		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>

2、修改配置项

rocketmq:
  name-server: localhost:9876
  producer:
    group: group_test # 生产者分组,事务消息会使用
    send-message-timeout: 3000 # 消息发送超时时长,默认3s
    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

2.1 push消息消费

通过实现RocketMQListener<T>接口,其中T是泛型,及消息内容的数据类型,可以是String, JSONObject,也可以是自定义数据结构类型

将监听器声明为bean,并实现onMessage方法即可

@Component
@RocketMQMessageListener(topic = "topic_test", consumerGroup = "group_test")
public class MessageListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("消费消息:" + s);
    }
}

注解中的messageModel属性可以用来设置消息模式,默认为集群模式

在这里插入图片描述

2.2 pull消息消费

添加消费者配置

rocketmq:
  name-server: localhost:9876
  consumer:
    group: "group_test"
    topic: "topic_test"

通过receive方法实现消费

 @GetMapping(value = "/poll")
    public void poll() {
        List<String> list = rocketMQTemplate.receive(String.class);
        for (String message : list) {
            System.out.println("poll消费:"+message);
        }
    }

2.3 顺序消息消费

与普通消息不同的是,要声明消费模式为顺序消费consumeMode= ConsumeMode.ORDERLY

@Component
@RocketMQMessageListener(topic = "topic_order", consumerGroup = "group_order", consumeMode= ConsumeMode.ORDERLY)
public class MessageOrderListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("顺序消费消息:" + s);
    }
}

3. 总结

消息消费相对更加简单,实际上掌握一种之后,其他类型的也就能够举一反三了,本文也只是针对最常用的类型进行列举,还有更多参数的支持,需要大家在实际应用中探索。

本文演示源码见:https://gitee.com/wuhanxue/wu_study/tree/master/demo/rocketmq_demo

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1845295.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

React+TS前台项目实战(十三)-- 全局常用响应式加载动画Loading组件封装

文章目录 前言Loading组件1. 功能分析2. 代码详细注释3. 使用方式4. 不同尺寸loading动画效果展示 总结 前言 高阶组件有几大优点&#xff0c;其中一个就是渲染劫持&#xff0c;如懒加载&#xff0c;是否显示该元素loading&#xff0c;这在项目中我们经常用到。毫无疑问&#…

【原创】springboot+mysql员工工资管理系统设计与实现

个人主页&#xff1a;程序猿小小杨 个人简介&#xff1a;从事开发多年&#xff0c;Java、Php、Python、前端开发均有涉猎 博客内容&#xff1a;Java项目实战、项目演示、技术分享 文末有作者名片&#xff0c;希望和大家一起共同进步&#xff0c;你只管努力&#xff0c;剩下的交…

Map-JAVA面试常问

1.HashMap底层实现 底层实现在jdk1.7和jdk1.8是不一样的 jdk1.7采用数组加链表的方式实现 jdk1.8采用数组加链表或者红黑树实现 HashMap中每个元素称之为一个哈希桶(bucket),哈希桶包含的内容有以下4项 hash值&#xff08;哈希函数计算出来的值&#xff09; Key value next(…

深入分析 Android BroadcastReceiver (四)

文章目录 深入分析 Android BroadcastReceiver (四)1. 广播接收器的深入优化与应用1.1 实时性要求高的应用1.1.1 示例&#xff1a;音乐播放器中处理耳机插拔事件1.1.2 动态注册接收器 1.2 处理耗时操作1.2.1 示例&#xff1a;使用 IntentService 处理耗时操作 1.3 安全性管理1.…

【JAVA】SpringBoot + skywalking 将接口的入参、出参、异常等信息上报到skywalking 链路追踪服务器上

【JAVA】SpringBoot skywalking 将接口的入参、出参、异常等信息上报到skywalking 链路追踪服务器上 1.下载SkyWalking APM https://skywalking.apache.org/downloads/ jdk8 不支持 SkyWalking APM 9.3.0以上版本&#xff0c;所以这里我们下载 9.3.0版本 2.下载 Java Agent …

网络安全复习笔记

概述 要素 CIA&#xff1a;可用性&#xff1b;完整性&#xff1b;保密性。 可控性&#xff1b;不可否认性&#xff1b;可审查性。 攻击 被动&#xff1a;窃听 - 保密性&#xff1b;监听 - 保密性主动&#xff1a;假冒 - 完整性&#xff1b;重放 - 完整性&#xff1b;改写 -…

20212416 2023-2024-2 《移动平台开发与实践》综合实践

移动平台开放综合实践 1.实验内容2.实验过程2.1 确定基础功能2.2 设计UI界面2.3 编写程序运行代码2.4 在基本功能的基础上丰富功能 3. 代码分析3.1设置按钮的点击事件监听器3.2 比分更新模块3.3 比分存储模块 4. 运行结果5.实践中遇到的问题及解决6.学习感悟与思考参考资料 1.实…

C++ 教程 - 05 构建编译

文章目录 构建工具cmake安装与使用CMakeLists.txt编写使用案例 构建工具 cmake, Cross Platform Make&#xff0c; &#xff08;对C&#xff09;跨平台编译工具&#xff0c;将CMakeLists.txt 文件编译为对应的文件&#xff0c;如linux下的 Makefile&#xff0c;然后使用make命…

【数据结构与算法(C语言)】离散事件模拟- 单链表和队列的混合实际应用

目录 1. 前言2. 流程图3. 数据结构3.1 单链表3.2 链式队列 4. 核心函数4.1 银行业务模拟 void BankSimulation()4.2 初始化 void OpenForDay()4.3 客户到达 void CustomerArrived(Event en)4.4 客户离开 void CustomerArrived(Event en) 5. 非核心函数5.1 新建客户 NewCustomer…

手机天线都去哪里了?

在手机的演变历程中&#xff0c;天线的设计和位置一直是工程师们不断探索和创新的领域。你是否好奇&#xff0c;现在的手机为什么看不到那些曾经显眼的天线了呢&#xff1f; 让我们一起揭开这个谜题。 首先&#xff0c;让我们从基础开始&#xff1a;手机是如何发出电磁波的&…

云手机在跨平台兼容性方面优势明显?有何应用场景

跨平台设备间无缝切换和数据同步的需求现在是很多人或者企业都需要的&#xff0c;云手机在这些方面似乎有很大优势&#xff1f;下面我们来具体探讨在兼容方面&#xff0c;云手机有何出彩之处&#xff1f;又支持哪些应用场景呢 先来说说云手机跨平台兼容性优势所在&#xff0c;要…

web端使用高德地图

web端使用高德地图 一、申请高德key和秘钥二、在项目中引入所需功能js、css文件三、实现地图选点、回显选点四、自定义地图私密限制 一、申请高德key和秘钥 申请高德key 申请成功后可以得到key 二、在项目中引入所需功能js、css文件 <script src"https://webapi.am…

大模型网信办备案全网最详细说明(附附件)

本文共分为以下几个章节 一、大模型算法备案的强制性 二、生成式人工智能(大语言模型)安全评估要点 三、大模型备案必备材料重点说明 四、大模型备案填报流程 五、大模型备案时间成本对比 六、备案建议 附录、过程性材料 一、大模型算法备案的强制性 1、强制要求备案 …

JMeter详解

一、线程组 作用:线程组就是控制Imeter用于执行测试的一组用户 位置:右键点击测试计划’-->添加 -->线程(用户)--> 线程组 特点: 模拟多人操作线程组可以添加多个&#xff0c;多个线程组可以并行或串行取样器(请求)和逻辑控制器必须依赖线程组才能使用线程组下可以…

ECM和MEMS技术在心肺声学监测中的应用

心肺疾病是全球范围内导致死亡的主要原因。因此&#xff0c;对这些疾病迹象的准确和快速评估对于为患者提供适当的医疗保健至关重要。心血管疾病最重要的迹象之一是心脏周期的异常。大多数呼吸系统疾病则表现为呼吸周期的异常。有多种方法可以监测心脏和肺部的周期。听诊是监测…

【面试干货】Java中的访问修饰符与访问级别

【面试干货】Java中的访问修饰符与访问级别 1、public2、protected3、默认&#xff08;没有访问修饰符&#xff09;4、private &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在Java中&#xff0c;访问修饰符用于控制类、变量、方法和构造器…

blender 快捷键 常见问题

一、快捷键 平移视图&#xff1a;Shift 鼠标中键旋转视图&#xff1a;鼠标中键缩放视图&#xff1a;鼠标滚动框选放大模型&#xff1a;Shift B 二、常见问题 问题&#xff1a;导入模型成功&#xff0c;但是场景中看不到。 解决办法&#xff1a;视图-裁剪起点&#xff0…

“Docker入门指南:概念与安装详解“

目录 # 概念 1. Docker常见问题 2. docker概念和安装 2.1 Docker的组成 2.2 Docker 组件及关系表 2.3 docker核心思想 2.4 docker镜像与容器两个核心概念 2.5 容器概念图 2.6 docker核心技术 2.6.1 镜像 (Image) 概述 关系 示例 2.6.2 容器 (Container) 概述 关…

贪吃蛇——c语言版

文章目录 演示效果实现的基本功能技术要点源代码实现功能GameStart打印欢迎界面和功能介绍绘制地图创建蛇创建食物 GameRun打印提示信息蛇每走一步 GameEnd蛇死亡后继续游戏 演示效果 贪吃蛇1.0演示视频 将终端应用程序改为控制台主机 实现的基本功能 贪吃蛇地图绘制蛇吃食物的…

[Mysql] 数据库基本概念

前言---数据库系统发展史 当今主流数据库介绍 一、操作系统 Linux操作系统 &#xff1a;RedHat CentOS Debian Ubuntu OpenSUSE 信创标准 会让系统逐渐国产化 国产系统&#xff1a;华为 欧拉 阿里 龙蜥 腾讯 tencentOS 银河麒麟 中标麒麟…