RocketMq(六)消息传输方式

news2025/1/16 8:00:34

在前面的基础上,

一、消息传输方式:在RocketMQ中,可以通过设置消费组的方式实现消息的广播和点对点传输。

1、默认方式:多个消费者轮询消费,若只有一个消费者则全部消费。通过下面的举例可以看到这其实就是点对点模式。

 (1) 生产者

 @RequestMapping("/sendUser")
    public void sendUser(@RequestBody UserDTO userDTO,int count){
        try{
            String userName = userDTO.getUserName();
            //同步发送多条消息
            for(int i=0;i<=count;i++){
                userDTO.setUserName(userName+i);
                Message msg = new Message(userTopic,userTag, JSON.toJSONString(userDTO).getBytes(StandardCharsets.UTF_8));
                msg.setKeys("key"+i);
                SendResult sendResult = defaultMQProducer.send(msg);
                System.out.println(userDTO.getUserName()+"发送结果:"+sendResult);
            }
}

(2)消费者 :

@Component
public class DefaultMQConsumeListener {
    private static Logger logger = LoggerFactory.getLogger(DefaultMQConsumeListener.class);

    @Value("${mq.groupname}")
    private String groupName;

    @Value("${mq.nameserveraddress}")
    private String nameserveraddress;

    @Value("${mq.user.topic}")
    private String userTopic;

    @Value("${mq.school.topic}")
    private String schoolTopic;

    /**
     * 订阅用户、学校mq
     */
    @PostConstruct
    public void defaultMQProducer(){
        try{
            logger.info("mq producer 配置 start");
            DefaultMQPushConsumer  consumer = new DefaultMQPushConsumer (groupName);
            consumer.setNamesrvAddr(nameserveraddress);
            // 订阅一个或多个Topic,以及Tag来过滤需要消费的消息
            //1、订阅用户消息
            consumer.subscribe(userTopic,"*");
            //consumer.registerMessageListener(new UserListener());
            //2、订阅学校消息
            consumer.subscribe(schoolTopic,"*");
            //consumer.registerMessageListener(new SchoolListener());
            consumer.registerMessageListener(new UserAndSchoolListener());
            //设置消费最大批量消息条数为2
            consumer.setConsumeMessageBatchMaxSize(2);
            consumer.start();
            logger.info("mq producer 配置 end");
        }
        catch (Exception e){
            logger.error("mq consume启动失败,errorMsg={}",e.getMessage(),e);
        }
    }
}
public class UserAndSchoolListener implements MessageListenerConcurrently {
    private static Logger logger = LoggerFactory.getLogger(UserAndSchoolListener.class);

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try{
            logger.info("{}消息条数:{} ", Thread.currentThread().getName(), list.size());
            for(MessageExt message : list){
                String body = new String(message.getBody(), "UTF-8");
                System.out.println("消息:"+body);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }catch (Exception e){
            logger.error("接收消息异常{}",e.getMessage(),e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

 (3)消费者1:配置同(2),打印处加了个区分

 logger.info("{}消息1条数:{} ", Thread.currentThread().getName(), list.size());
            for(MessageExt message : list){
                String body = new String(message.getBody(), "UTF-8");
                System.out.println("消费者-1 :"+body);
            }

访问http://localhost:8888/mqProviderTest/sendMessage/sendUser?count=6

生产者发送成功

张三0发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F0D0000, offsetMsgId=AC1F070900002A9F00000000000709C8, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=1], queueOffset=81]
张三1发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F1F0001, offsetMsgId=AC1F070900002A9F0000000000070ACC, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=0], queueOffset=82]
张三2发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F260002, offsetMsgId=AC1F070900002A9F0000000000070BD0, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=1], queueOffset=82]
张三3发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F2D0003, offsetMsgId=AC1F070900002A9F0000000000070CD4, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=0], queueOffset=83]
张三4发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F370004, offsetMsgId=AC1F070900002A9F0000000000070DD8, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=1], queueOffset=83]
张三5发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F3F0005, offsetMsgId=AC1F070900002A9F0000000000070EDC, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=0], queueOffset=84]
张三6发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000012BB418B4AAC2509B4F4A0006, offsetMsgId=AC1F070900002A9F0000000000070FE0, messageQueue=MessageQueue [topic=test-mq-user-topic, brokerName=i-8847E0CB, queueId=1], queueOffset=84]

 消费者消费

2023-10-16 15:39:15.633  INFO 22376 --- [-mq-groupname_1] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_1消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三1"}
2023-10-16 15:39:15.639  INFO 22376 --- [-mq-groupname_2] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_2消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三3"}
2023-10-16 15:39:15.658  INFO 22376 --- [-mq-groupname_3] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_3消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三5"}

消费者1消费

2023-10-16 15:39:15.623  INFO 7496 --- [-mq-groupname_1] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_1消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三0"}
2023-10-16 15:39:15.630  INFO 7496 --- [-mq-groupname_2] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_2消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三2"}
2023-10-16 15:39:15.647  INFO 7496 --- [-mq-groupname_3] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_3消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三4"}
2023-10-16 15:39:15.664  INFO 7496 --- [-mq-groupname_4] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_4消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三6"}

多次访问,可以看到两个消费者在轮询消费

2、消息的广播:广播模式下,一个消息可以被多个消费者消费,每个消费者都能够接收到该消息的一个副本。实现消息的广播可以通过设置消费组的方式,将消费组的模式设置为广播模式。

consumer.setMessageModel(MessageModel.BROADCASTING);

在两个消费者的配置中都加上这句

生产者发送成功后,两个消费者打印如下

2023-10-16 16:04:01.886  INFO 8260 --- [-mq-groupname_7] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_7消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三0"}
2023-10-16 16:04:01.893  INFO 8260 --- [-mq-groupname_8] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_8消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三1"}
2023-10-16 16:04:01.905  INFO 8260 --- [-mq-groupname_9] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_9消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三2"}
2023-10-16 16:04:01.923  INFO 8260 --- [mq-groupname_10] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_10消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三3"}
2023-10-16 16:04:01.944  INFO 8260 --- [mq-groupname_11] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_11消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三4"}
2023-10-16 16:04:01.946  INFO 8260 --- [mq-groupname_12] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_12消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三5"}
2023-10-16 16:04:01.955  INFO 8260 --- [mq-groupname_13] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_13消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三6"}
2023-10-16 16:04:01.886  INFO 23564 --- [-mq-groupname_7] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_7消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三0"}
2023-10-16 16:04:01.893  INFO 23564 --- [-mq-groupname_8] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_8消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三1"}
2023-10-16 16:04:01.904  INFO 23564 --- [-mq-groupname_9] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_9消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三2"}
2023-10-16 16:04:01.922  INFO 23564 --- [mq-groupname_10] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_10消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三3"}
2023-10-16 16:04:01.944  INFO 23564 --- [mq-groupname_12] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_12消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三5"}
2023-10-16 16:04:01.944  INFO 23564 --- [mq-groupname_11] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_11消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三4"}
2023-10-16 16:04:01.951  INFO 23564 --- [mq-groupname_13] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_13消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三6"}

3、点对点传输:点对点传输模式下,一个消息只能被一个消费者消费,该消费者消费该消息后,其他消费者将无法再次接收到该消息的副本。实现消息的点对点传输可以通过设置消费组的方式,将消费组的模式设置为集群模式。这里将两个消费者设置如下

consumer.setMessageModel(MessageModel.CLUSTERING);

效果同(1)

4、两个消费者一个设置为点对点,一个设置为广播。如我设置消费者-1为广播式

发送者发送成功后,设置为广播式的全部消费,设置为点对点的轮询消费。

2023-10-16 16:11:45.793  INFO 10852 --- [mq-groupname_14] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_14消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三0"}
2023-10-16 16:11:45.806  INFO 10852 --- [mq-groupname_15] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_15消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三2"}
2023-10-16 16:11:45.821  INFO 10852 --- [mq-groupname_16] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_16消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三4"}
2023-10-16 16:11:45.835  INFO 10852 --- [mq-groupname_17] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_17消息条数:1 
消息:{"age":1,"userAccount":"zhangsan","userName":"张三6"}
2023-10-16 16:11:45.793  INFO 3812 --- [mq-groupname_16] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_16消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三0"}
2023-10-16 16:11:45.801  INFO 3812 --- [mq-groupname_17] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_17消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三1"}
2023-10-16 16:11:45.806  INFO 3812 --- [mq-groupname_18] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_18消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三2"}
2023-10-16 16:11:45.813  INFO 3812 --- [mq-groupname_19] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_19消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三3"}
2023-10-16 16:11:45.821  INFO 3812 --- [mq-groupname_20] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_20消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三4"}
2023-10-16 16:11:45.827  INFO 3812 --- [-mq-groupname_3] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_3消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三5"}
2023-10-16 16:11:45.833  INFO 3812 --- [-mq-groupname_6] com.demo.listener.UserAndSchoolListener  : ConsumeMessageThread_test-mq-groupname_6消息1条数:1 
消费者-1 :{"age":1,"userAccount":"zhangsan","userName":"张三6"}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1099038.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Rockchip 3588 开关HDMI

需求 上层Android要控制hdmi&#xff0c;dp等等(带有节点的功能)的开关&#xff0c;配置&#xff0c;获取状态。 方案 我们可以从Settings的源码中找到方法&#xff1a; 从HdmiSettings.java的源码中可以看到如下注释&#xff1a; 相关节点定义&#xff1a; 控制函数如下&…

云服务器ip使用细节(公网、私有)

场景&#xff1a; 当我们对tcp服务器进行监听的时候&#xff0c;可能需要用到ip地址&#xff0c;比如使用httplib::Service::listen(ip, port)&#xff0c;而当我们访问tcp服务器时也需要ip地址 但这两个ip是不同的&#xff01; 每个云服务器通常都会有一个公网IP地址和一个私有…

Redis删除过期key策略

文章目录 前言Redis中key的的过期时间在创建 key 时使用 EXPIRE 命令设置过期时间(秒级)使用 EXPIREAT 命令设置一个精确的过期时间(unix 时间戳)使用 PEXPIRE 命令设置过期时间(毫秒级)使用 PEXPIREAT 命令设置毫秒级精确过期时间在 Redis 配置文件中设置所有 key 的默认过期时…

C语言--好题

目录 题目一&#xff1a;二维数组传参 题目二&#xff1a; malloc开辟二维数组 题目三&#xff1a; 位段 题目四&#xff1a; 联合体 题目五&#xff1a;位段 题目六&#xff1a;找单身狗2 题目一&#xff1a;二维数组传参 下面代码中print_arr函数参数设计哪个是正确的…

某985证书站挖掘记录

0x1.前言 ​ 本文章仅用于信息安全防御技术分享&#xff0c;因用于其他用途而产生不良后果&#xff0c;作者不承担任何法律责任&#xff0c;请严格遵循中华人民共和国相关法律法规&#xff0c;禁止做一切违法犯罪行为。文中涉及漏洞均以提交至教育漏洞平台&#xff0c;现已修复…

​ModbusTCP转Profibus-DP从站网关把modbus的数据传到300plc上的应用方法​​

远创智控YC-DPS-TCP&#xff0c;让Profibus和ModbusTcp总线之间的通讯协议转换更简单。 远创智控YC-DPS-TCP 是一种将Profibus和ModbusTcp总线连接的通讯协议转换设备。这个设备非常符合ModbusTcp协议的设备&#xff0c;比如各种仪表、传感器、交换机等&#xff0c;它们可以通过…

odoo笔记

后台运行项目 nohup python odoo-bin -c ./debian/odoo.conf & 查看当前odoo进程 &#xff08;更新项目模型类时&#xff0c;1.先重启项目&#xff0c;2.再去网页更新模块&#xff09; ps -fA | grep odoo kill 进程id 删库 /web/database/manager 查找文件夹 find …

linux centos7 环境下 no such file or directory

目录 1.问题描述2.主要原因2.1修改后代码2.2修改前代码 总结参考 1.问题描述 预览excel文件时无法找到对应的html文件 2.主要原因 异常原因&#xff1a;代码获取的是系统的tmp文件&#xff0c;但是linux环境环境中心tmp目录是没有权限的&#xff0c;所以不能获取系统的根目录…

Python如何获取动态加载的数据呢 ?

大家早好、午好、晚好吖 ❤ ~欢迎光临本文章 如果有什么疑惑/资料需要的可以点击文章末尾名片领取源码 例子1&#xff1a;爬取dy电影中的电影详情数据 url:https://movie.douban.com/ 1.什么是动态加载的数据&#xff1a; 我们通过requests模块进行数据爬取无法每次都是可见…

vue使用高德地图轨迹活动效果demo(整理)

在html页面引入您自己的key <script language"javascript" src"https://webapi.amap.com/maps?v1.4.15&key6b26c2c58770d13a4ecf2b96615dbaee"></script><template><div class"index"><div id"amapContain…

什么是粘包和半包问题

什么是粘包和半包问题 粘包 发送的是 ABC和DEF 接收到的是 ABCDEF 半包 发送的是 ABCD 接收到的是 AB 和 CD 为什么会有粘包问题? 因为 TCP 是面向连接的传输协议&#xff0c;它是以“流”的形式传输数据的&#xff0c;而“流”数据是没有明确的开始和结尾边界的&#xff0…

MIT6.5830 Lab1-Go tutorial实验记录(一

MIT6.5830 Lab1-Go tutorial实验记录&#xff08;一&#xff09; – WhiteNights Site 标签&#xff1a;Golang, 数据库 编写一个简单的http server。 前言 MIT数据库系统实验 在网上看到了这么个实验&#xff0c;刚好对golang和数据库比较感兴趣&#xff0c;于是开始跟着做实…

五大亮点探索互联网医院源码的创新应用方式

作为互联网医疗行业的专家&#xff0c;我将为您揭示互联网医院源码的五大创新亮点。随着数字化技术的迅猛发展&#xff0c;互联网医院源码成为了提升医疗服务质量和提供便捷就医体验的重要工具。现在&#xff0c;让我们一起深入探索这五大亮点&#xff0c;了解互联网医院源码在…

什么是人事RPA?人事RPA解决什么问题?人事RPA实施难点在哪里?

每家公司人力资源部门每天需要筛选适合自己公司岗位要求的人才并与之沟通邀约面试、每月全公司员工的考勤状态核对、业绩考核核对、入离职手续办理、新员工培训等等&#xff0c;每项业务流程都由人手操作&#xff0c;效率极低、流程繁琐、费时费力。HR部门每天面对的业务数据量…

基于Java的驾校教练预约管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09; 代码参考数据库参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…

使用Linux远程连接OpenGauss数据库的步骤和方法

文章目录 前言1. Linux 安装 openGauss2. Linux 安装cpolar3. 创建openGauss主节点端口号公网地址4. 远程连接openGauss5. 固定连接TCP公网地址6. 固定地址连接测试 前言 openGauss是一款开源关系型数据库管理系统&#xff0c;采用木兰宽松许可证v2发行。openGauss内核深度融合…

COMMUTING CONDITIONAL GANS FOR MULTI-MODAL FUSION

方法 C f ^f f是分类器&#xff0c;P f ^f f(o i _i i​)是第i个物体出现的融合概率 作者未公布代码

Sectigo数字证书

安装SSL证书可以提供数据保护、确保身份真实性&#xff0c;并增强用户信任&#xff0c;从而提高网站的安全性和可信度。所以越来越多注重网络安全的技术人员在保障数据安全的手段上&#xff0c;选择直接安装SSL证书。 其中Sectigo的SSL证书是全球SSL证书市场占有率最高的CA公司…

电脑打开图片比例太大怎么调?这个方法又快又好用

别人给我们发送的图片&#xff0c;有许多打开之后提示图片太大&#xff0c;这种情况改怎么处理呢&#xff1f;其实可以使用图片改大小&#xff08;https://www.yasuotu.com/size&#xff09;功能来对图片尺寸大小修改就可以了&#xff0c;接下来就分享一个批量修改图片尺寸的方…

牛奶配送经营商城小程序的作用是什么

牛奶是老少皆爱喝的饮品&#xff0c;市场呈现大品牌多区域拓展形式&#xff0c;中小企业在经营中较为受限制&#xff0c;传统线下超市铺货占领的方式已经变得很低效&#xff0c;线下直营店和经销商在实际经营中面临难题。 线下经营受困&#xff0c;线上经营成为众商家扩大生意…