【RocketMQ系列四】消息示例-简单消息的实现

news2025/1/11 14:14:52

您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门

文章目录

    • 1. 前言
    • 2. 同步消息(生产者)
        • 2.1. 测试代码
    • 3. 消费者
    • 4. 异步消息
    • 5. 单向消息
    • 6. 总结

1. 前言

上一篇文章我们介绍了RocketMQ集群的搭建,这篇文章将主要使用RocketMQ测试下简单消息。

2. 同步消息(生产者)

同步消息的话,消费者发布消息之后必须等集群返回成功之后才会发布下一条消息,消息的发布是同步进行的。

2.1. 测试代码
  1. 创建生产者

    	// 1.创建生产者对象
    	DefaultMQProducer defaultMQProducer = new DefaultMQProducer("feige-producer-group");
    
  2. 指定nameserver

    // 2.指定nameServer
    defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");
    

    因为每个nameserver都有所有broker的路由信息,所以只需要指定一个nameserver。

  3. 启动生产者发布消息

    // 3.启动生产者
    		defaultMQProducer.start();
    		//4.创建消息
    		for (int i = 0; i < 100; i++) {
    			// 创建消息,指定topic,以及消息体
    			Message message = new Message("base_topic", ("飞哥测试消息" + i).getBytes());
    			//5.发送消息
    			SendResult send = defaultMQProducer.send(message);
    			System.out.println(send);
    		}
    		// 6.关闭生产者
    		defaultMQProducer.shutdown();
    

创建一个名为 base_topic的topic,虽然集群中还没有这个topic,但是由于前面我们搭建集群的时候指定的可以自动创建topic autoCreateTopicEnable=true 。 然后消息体是:飞哥测试消息xxx。这里打印了集群的响应结果SendResult。

运行结果(部分结果):

SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2250000, offsetMsgId=AC1FB85900002A9F00000000001F70B6, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=0], queueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2410001, offsetMsgId=AC1FB85900002A9F00000000001F71A1, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=1], queueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E24D0002, offsetMsgId=AC1FB85900002A9F00000000001F728C, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=2], queueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2570003, offsetMsgId=AC1FB85900002A9F00000000001F7377, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=3], queueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2600004, offsetMsgId=AC1FB85900002A9F00000000001F7462, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=0], queueOffset=126]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2830005, offsetMsgId=AC1FB85900002A9F00000000001F754D, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=1], queueOffset=126]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E28C0006, offsetMsgId=AC1FB85900002A9F00000000001F7638, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=2], queueOffset=126]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2970007, offsetMsgId=AC1FB85900002A9F00000000001F7723, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=3], queueOffset=126]
    

这里SendResult 返回结果有几个属性需要说明下:

  1. sendStatus: 发送状态
  2. msgId:消息ID,每个消息都是唯一的
  3. offsetMsgId:偏移消息ID,在队列里的消息唯一ID
  4. messageQueue:用于指定当前这条消息落到哪个队列中,在搭建集群的时候指定一个broker有4个messageQueue。
  5. topic:当前队列所属的主题
  6. brokerName:当前队列所属的broker
  7. queueId:当前队列在broker中序号
  8. queueOffset:当前消息在队列里的偏移量。

从打印的结果可以看出,目前这100条消息是轮流的发送到broker-b中的4个队列中的。关系如下图所示:

image-20230927223810119

3. 消费者

  1. 创建消费者&指定nameserver

    	// 1.创建消费者
    		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
    		// 2.指定连接nameServer
    		consumer.setNamesrvAddr("172.31.184.89:9876");
    
  2. 订阅一个或者多个topic,这里指定消费base_topic,不做过滤。

    // 3.订阅一个或者多个topic,这里指定消费base_topic,不做过滤
    consumer.subscribe("base_topic", "*");
    
  3. 创建一个回调函数&处理消息

    		// 4.创建一个回调函数
    	consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    			// 5.处理消息
    			for (MessageExt msg : msgs) {
    				System.out.println(msg);
    				System.out.println("收到的消息内容:" + new String(msg.getBody()));
    			}
    			// 返回消费成功的对象
    			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    		});
    

    创建一个回调监听函数,它是一个长轮询,当有消息产生时,它会监听到并进行消费(ps: broker会把消息推送给消费者)。

  4. 启动消费者

    	// 6.启动消费者
    		consumer.start();
    		System.out.println("消费者已经启动");
    

    运行结果(部分截图):

    image-20231003082615858

4. 异步消息

异步消息与同步消息的区别就是异步消息不需要等待集群返回发送成功的标识,即可发送下一条消息。主要是发送消息阶段有区别。其他的与同步消息相同。

// 异步消息发送失败重试次数
		defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);

		CountDownLatch2 countDownLatch2 = new CountDownLatch2(100);
		// 4.创建消息
		for (int i = 0; i < 100; i++) {
			// 创建消息,指定topic,以及消息体
			Message message = new Message("base_topic", "TagA", "feige", ("飞哥异步消息测试" + i).getBytes());
			// 5.发送消息
			int index = i;
			defaultMQProducer.send(message, new SendCallback() {
				@Override
				public void onSuccess(SendResult sendResult) {
					countDownLatch2.countDown();
					System.out.printf("%-10d ok,%s,%n", index,sendResult.getMsgId());
				}

				@Override
				public void onException(Throwable e) {
					countDownLatch2.countDown();
					System.out.printf("%-10d fail,%s,%n", index, e);
					e.printStackTrace();
				}
			});

		}
		System.out.println("=====================");
		countDownLatch2.await(10, TimeUnit.SECONDS);

异步消息在调用send方法的时候,需要实现SendCallback 接口。此函数有 onSuccess 方法和onException 方法。onSuccess 方法在消息发送成功的时候会被集群调用,而onException方法则是在消息发送失败的时候被调用。

5. 单向消息

单向消息只管发送不管接收。

//4.创建消息
		for (int i = 0; i < 100; i++) {
			// 创建消息,指定topic,以及消息体
			Message message = new Message("base_topic", ("飞哥同步消息测试:" + i).getBytes());
			//5.发送消息
		    defaultMQProducer.sendOneway(message);
		}

6. 总结

本文详细介绍了简单消息里的同步消息,异步消息和单向消息。他们的区别主要是生产者发布消息时的区别。另外,简单消息的消费是没有顺序的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1100039.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

十五届蓝桥选拔赛Scratch-2023.08.20STEMA测评试题解析

2023年8月20日举行的第15届蓝桥杯STEMA测评Scratch编程中级组 T2 飞驰的高铁 具体要求: 1). 点击绿旗,角色、背景如图所示; 2). 按下一次数字1按键之后,画面中的景色持续向左侧水平移动(参照程序演示视频); 3). 按下一次数字2按键之后,程序结束。 评判标准: 5分:…

“岗课赛证”融通的物联网综合实训室建设方案

一、概述 随着5G技术的普及应用和产业经济的革新发展,物联网产业所呈现的广阔前景带来了对创新型技术技能人才的迫切需求。高职院校物联网专业建设也因此转变为面向国家战略性新兴产业发展需求。当前,“岗位课程竞赛证书”融通的培育理念,是高职院校物联网人才培养和专业优化的…

2023年中国商业版服务器操作系统市场发展规模分析:未来将保持稳定增长[图]

服务器操作系统一般指的是安装在大型计算机上的操作系统&#xff0c;比如Web服务器、应用服务器和数据库服务器等&#xff0c;是企业IT系统的基础架构平台&#xff0c;也是按应用领域划分的三类操作系统之一。同时服务器操作系统也可以安装在个人电脑上。 服务器操作系统分类 …

OpenAI将发布DALL·E3,多模态输出模式引爆热点

OpenAI在官网宣布&#xff0c;在今年10月份将通过API向ChatGPT Plus和企业版用户提供全新文本生成图片产品——DALLE 3。 OpenAI在去年 4 月推出了DALL・E 2 &#xff0c;时隔一年DALLE 3即将上线&#xff0c;OpenAI 表示&#xff0c;「DALL・E 3 比以往系统更能理解细微差别和…

17 - 并发容器的使用:识别不同场景下最优容器

在并发编程中&#xff0c;我们经常会用到容器。今天我要和你分享的话题就是&#xff1a;在不同场景下我们该如何选择最优容器。 1、并发场景下的 Map 容器 假设我们现在要给一个电商系统设计一个简单的统计商品销量 TOP 10 的功能。常规情况下&#xff0c;我们是用一个哈希表…

每天五分钟机器学习:如何解决欠拟合问题

本文重点 欠拟合是机器学习中常见的问题之一,指的是模型无法很好地拟合训练数据,导致预测结果的误差较大。欠拟合问题一般是由于模型过于简单或者训练数据过少导致的。下面将详细介绍如何解决欠拟合问题。 增加模型复杂度 1. 增加模型的层数:对于神经网络模型,可以增加隐…

1.SpringSecurity -快速入门、加密、基础授权

SpringSecurity简介 文章目录 SpringSecurity简介一、基本概念1.1 认证&#xff08;Authentication&#xff09;方式1.2 会话(Session)介绍1.3 授权(Authorization)介绍1.4 RBAC 二、SpringSecurity入门2.1 快速入门2.1.1 Maven坐标2.1.2 接口2.1.3 源码 2.2 配置文件配置用户名…

【技巧】如何设置Excel表只输入固定内容?

如果你需要在Excel表格中输入固定的内容&#xff0c;可以设置“限制录入内容”&#xff0c;这样就只能输入设置好的内容&#xff0c;避免不小心输入错误信息。下面来看看如何设置吧。 首先&#xff0c;打开Excel表格后&#xff0c;选中需要输入固定内容的表格区域。 比如图片…

配置hpa后,target显示<unknown>/50%

背景&#xff1a; 有两个服务&#xff0c;server 负责主要后端请求&#xff0c;bill 负责计量计费请求。服务都是使用 helm 部署。测试提了一个缺陷&#xff0c;说全部服务没有配置hpa。 解决一 按照之前的代码结构添加了hpa后&#xff0c;发现&#xff1a; ➜ kubectl get…

postman如何使用md5 、base64加密传参

使用CryptoJS库 什么是CryptoJS&#xff1f; CryptoJS是一个纯JavaScript实现的加密库&#xff0c;提供了很多常见的加密算法和加密模式&#xff0c;例如AES、DES、TripleDES、MD5、SHA-1、SHA-256等。它支持的加密方式很全面&#xff0c;使用简便&#xff0c;而且在前端中使用…

MES系统作业调度

一、MES系统作业调度的概念和功能 作业调度是指在制造过程中&#xff0c;根据生产计划和实际情况&#xff0c;合理安排和调度各项任务和资源&#xff0c;以达到最佳的生产效率和资源利用率。MES系统作业调度功能涉及以下方面&#xff1a; 1. 任务计划与分配&#xff1a;MES系…

装配体的模态分析-SOLIDWORKS 2024新功能

修复线性或圆形零部件阵列中缺失的参考 您可以在线性零部件阵列和圆形零部件阵列中修复缺失的方向参考。 对于线性零部件阵列&#xff0c;SOLIDWORKS 通过在零部件上选择参考来修复缺失的方向参考&#xff08;所选参考与 缺失的参考具有相同的类型和方向&#xff0c;而且所选参…

8-k8s-污点与容忍

文章目录 一、概念二、相关操作三、实操污点NoSchedule四、实操污点NoExecute五、实操容忍 一、概念 污点与容忍 污点taints定义在节点之上的键值型属性数据。当节点被标记为有污点&#xff0c;那么意味着不允许pod调度到该节点。 容忍tolerations是定义在 Pod对象上的键值型属…

深度学习——卷积神经网络(CNN)基础二

深度学习——卷积神经网络&#xff08;CNN&#xff09;基础二 文章目录 前言三、填充和步幅3.1. 填充3.2. 步幅3.3. 小结 四、多输入多输出通道4.1. 多输入通道4.2. 多输出通道4.3. 11卷积层4.4. 小结 总结 前言 上文对卷积有了初步的认识&#xff0c;其实卷积操作就是通过卷积…

《开箱元宇宙》:《福布斯》如何通过 Web3 改进讲故事的方式

你们是否想知道 The Sandbox 如何融入世界上最具标志性的品牌和名人的战略&#xff1f;在本期《开箱元宇宙》系列中&#xff0c;我们与《福布斯》一起探讨了他们为何决定在 The Sandbox 中尝试 Web3&#xff0c;以及他们如何改变讲故事的方式&#xff0c;以便在一次体验中吸引超…

C++QT---QT-day1

/*************************登陆窗口制作***************************/ #include "mywindow.h"MyWindow::MyWindow(QWidget *parent): QMainWindow(parent) {this->resize(500,350);this->setWindowTitle("登陆界面");this->setWindowIcon(QIcon(…

【特纳斯电子】基于单片机的火灾监测报警系统-实物设计

视频及资料链接&#xff1a;基于单片机的火灾监测报警系统-实物设计 - 电子校园网 (mcude.com) 编号&#xff1a; T0152203M-SW 设计简介&#xff1a; 本设计是基于单片机的火灾监测报警系统&#xff0c;主要实现以下功能&#xff1a; 1.通过OLED显示温度、烟雾、是否有火…

Leetcode—136.只出现一次的数字【简单】

2023每日刷题&#xff08;二&#xff09; Leetcode—136.只出现一次的数字 位运算法 实现代码 int singleNumber(int* nums, int numsSize){int i 0;int res 0;for(; i < numsSize; i) {res ^ nums[i];}return res; }运行结果 之后我会持续更新&#xff0c;如果喜欢我的…

【网络安全 --- win10系统安装】win10 系统详细安装过程(提供资源)

一&#xff0c;资源下载 百度网盘镜像下载地址链接&#xff1a; 百度网盘 请输入提取码百度网盘为您提供文件的网络备份、同步和分享服务。空间大、速度快、安全稳固&#xff0c;支持教育网加速&#xff0c;支持手机端。注册使用百度网盘即可享受免费存储空间https://pan.ba…

笔试算法题ACM模式输入输出处理

1. Python input之后得到的全是string类型&#xff0c;数字需要用int(n)进行转换 读取单个数 n int(input()) 读取一串数组&#xff1a; nums [int(n) for n in input().split()] &#xff08;nums是个数组&#xff09; 读取字符串&#xff1a; stringinput().split(…