手搭手RocketMQ发送消息

news2025/1/12 20:46:18

消息中间件的对比

消息中间件

ActiveMQ

RabbitMQ

RocketMQ

kafka

开发语言

java

erlang

java

scala

单击吞吐量

万级

万级

10万级

10万级

时效性

ms

us

ms

ms

可用性

高(主从架构)

高(主从架构)

非常高(主从架构)

非常高(主从架构)

消息中间件: activeMQ:java(jms协议),性能一般,吞吐量低。rabbitMQ:erlang(amqp协议),性能好,功能丰富,吞吐量一般。rocketMQ:java,性能好,吞吐量丰富,功能丰富。Kafka: scala,吞吐量最大,功能单一,大数据领域

RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。


RocketMQ的作用:数据收集、限流削峰、异步解耦
数据收集
分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。

限流削峰
MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

异步解耦
上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。

rocketmq.apache.org

Broker:经纪人(经理人)

Topic主题:消息区分,分类,虚拟结构

Queue:消息队列

Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

发送消息

发送同步消息

同步消息发送后会用一个返回值,也就是MQ服务器接收到消息返回的一个确认,这种方式非常安全,但是性能就没那么高,而在MQ集群中,也是要等到所有的从机都复制了消息以后才会返回,这种方式适合重要消息的场景

@Test
void rocketmqProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("Topic","消息".getBytes());
SendResult send = producer.send(message);
System.out.println("发送状态"+send.getSendStatus());
//关闭生产者
producer.shutdown();
}

@Test
void rocketmqConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("Topic","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

发送异步消息

异步消息常用在对响应时间敏感的业务场景,发送端不能容忍长时间等待Broker的响应。发送完后会有一个异步消息通知

@Test
void aysncProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("aysncTopic","异步消息".getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功"+sendResult);
}

@Override
public void onException(Throwable throwable) {
System.err.println("发送失败"+throwable);
}
});
System.out.println("执行了");
//关闭生产者
//producer.shutdown();
//挂起当前jvm
System.in.read();
}

@Test
void aysncConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("aysncTopic","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

发送单向消息

单向消息发送这种方式不关心发送结果的场景,这种方式吞吐量大,但存在消息丢失的风险。使用案例:日志信息发送

@Test
void OnewayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("Oneway");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("Oneway","单向消息

".getBytes());
//发送消息
producer.sendOneway(message);
//关闭生产者
producer.shutdown();
}

@Test
void OnewayConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("Oneway","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

发送延时消息

发送延时消息,顾名思义。场景:比如淘宝商城下单后,并未支付,有30分钟未支付订单状态

@Test
void delayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("delayGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("delay","延迟消息".getBytes());
//设置延时 根据官方延时等级
message.setDelayTimeLevel(2);
//发送消息
producer.sendOneway(message);
//关闭生产者
producer.shutdown();
}

@Test
void delayConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("delay","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

发送批量消息

批量消息:可以一次性发送一组消息

@Test
void delayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("delayGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
//批量消息
List<Message> messages = Arrays.asList(
new Message("delay","批量消息1".getBytes()),
new Message("delay","批量消息2".getBytes()),
new Message("delay","批量消息3".getBytes())
);
//发送消息
producer.send(messages);
//关闭生产者
producer.shutdown();
}

发送带标签消息

RocketMQ提供消息过滤功能,可根据业务逻辑区分,带有A标签的被A消费,带有B标签的被B消费

@Test
void TagProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("TagGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
//批量消息
Message message1 = new Message("tagTopic", "tagA", "tag标签内容A".getBytes());
Message message2 = new Message("tagTopic", "tagB", "tag标签内容B".getBytes());
//发送消息
producer.send(message1);
producer.send(message2);
//关闭生产者
producer.shutdown();
}

@Test
void TagAConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("tagTopic","tagA");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

@Test
void TagBConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("tagTopic","tagB");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

 发送顺序消息

发送的消息要保证消息是一定有序的,顺序消息,发送到同一个队列

实体类

@Data
@AllArgsConstructor
public class MessageM {
private int userID;
private String desc;
}

顺序消息,发送到同一个队列

private List<MessageM> messageMs = Arrays.asList(
new MessageM(1,"下单"),
new MessageM(1,"付款"),
new MessageM(1,"配送"),
new MessageM(2,"下单"),
new MessageM(2,"付款"),
new MessageM(2,"配送")
);

@Test
void orderProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.211.131:9876");
//启动
producer.start();

messageMs.forEach(messageM -> {
//创建消息
Message message = new Message("orderMsg",messageM.toString().getBytes());
//发送顺序消息,发送到同一个队列
try {
//相同的userID去相同的队列
producer.send(
message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//选择队列
int hashCode = o.toString().hashCode();
int i = hashCode % list.size();
return list.get(i);
}
},
messageM.getUserID());
} catch (MQClientException e) {
throw new RuntimeException(e);
} catch (RemotingException e) {
throw new RuntimeException(e);
} catch (MQBrokerException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
//关闭生产者
producer.shutdown();
}

@Test
void orderConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.211.131:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("orderMsg","*");

//设置监听器(一直,异步回调方式)
//MessageListenerConcurrently 并发模式,多线程
//MessageListenerOrderly 顺序模式,单线程
consumer.registerMessageListener(new MessageListenerOrderly() {
//消费方法
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
System.out.println("当前线程ID"+Thread.currentThread().getId());
return ConsumeOrderlyStatus.SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1514172.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mybatisplus使用基本步骤

1.设计实体类&#xff0c;给实体类加一些需要的注解 2.创建Service接口继承mybatisplus提供的 IService<实体类>接口 3.实现Service接口&#xff0c;并继承mybatisplus提供的 ServiceImpl<mapper接口&#xff0c;实体类>类 4.mapper接口继承mybatisplus提供的 B…

java020 - Java集合进阶

1、集合知识回顾 1.1 集合特点 提供了一种储存空间可变的储存模型&#xff0c;储存的数据容量随时可以发生改变。 1.2 集合类体系结构 单列集合和双列集合&#xff1a; 单列集合中&#xff1a;list和set区别&#xff08;数据是否重复&#xff09; 区分接口和实现类&#…

【零基础学习04】嵌入式linux驱动中信号量功能基本实现

大家好,为了进一步提升大家对实验的认识程度,每个控制实验将加入详细控制思路与流程,欢迎交流学习。 今天给大家分享一下,linux系统里面信号量操作的具体实现,操作硬件为I.MX6ULL开发板。 第一:信号量基本简介 信号量是同步的一种方式,linux内核也提供了信号量…

【数据结构】二叉树OJ题目

965. 单值二叉树 如果二叉树每个节点都具有相同的值&#xff0c;那么该二叉树就是单值二叉树。 只有给定的树是单值二叉树时&#xff0c;才返回 true&#xff1b;否则返回 false。 示例 1&#xff1a; 输入&#xff1a;[1,1,1,1,1,null,1] 输出&#xff1a;true示例 2&#x…

链路聚合实验(思科)

华为设备参考&#xff1a; 一&#xff0c;技术简介 网络设备的链路聚合技术&#xff08;Link Aggregation&#xff09;是一种将多个物理链路捆绑在一起&#xff0c;形成一个逻辑链路的技术。这样做可以增加带宽、提高可靠性和实现负载均衡。 二&#xff0c;实验目的 橙色的阻…

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的条形码二维码检测系统(深度学习+UI界面+训练数据集+Python代码)

摘要&#xff1a;在物流和制造业中&#xff0c;开发一套高效的条形码与二维码识别系统显得尤为关键。本博文深入探讨了如何利用深度学习技术打造出一套先进的条形码及二维码检测系统&#xff0c;并且提供了一套完整的实施方案。该系统搭载了性能卓越的YOLOv8算法&#xff0c;并…

基于Ambari搭建大数据分析平台

一、部署工具简介 1. Hadoop生态系统 Hadoop big data ecosystem in Apache stack 2. Hadoop的发行版本 Hadoop的发行版除了Apache的开源版本之外&#xff0c;国外比较流行的还有&#xff1a;Cloudera发行版(CDH)、Hortonworks发行版&#xff08;HDP&#xff09;、MapR等&am…

xss.pwnfunction.com靶机 Warmups

通关要求弹出警告框alert(1337) 没有用户交互 不能使用外链接 在chrome中测试 Ma Spaghet! 通过分析代码我们可以看到它直接用innerHTML将接收的内容赋值 但是我们不能使用<script>标签因为&#xff1a;HTML 5 中指定不执行由 innerHTML 插入的 <script> 标签。 所…

读书笔记之《机器与人》:AI如何重构工作方式和流程?

《机器与人: 埃森哲论新人工智能》作者是【美】保罗•多尔蒂和詹姆斯•威尔逊 &#xff0c;原作名: Human Machine: Reimagining Work in the Age of AI&#xff0c;2018年出版。 保罗•多尔蒂&#xff08;PAUL DAUGHERTYH&#xff09;&#xff1a;埃森哲首席技术官和创新官、…

Spring MVC中的REST风格

文章目录 REST风格1 REST简介问题导入1.1 REST介绍1.2 RESTful介绍1.3 注意事项 2 RESTful入门案例问题导入2.1 快速入门2.2 PathVariable介绍2.3 RequestBody、RequestParam、PathVariable区别和应用 3 REST快速开发【重点】3.1 代码中的问题3.2 Rest快速开发 4案例&#xff1…

springboot的maven多模块如何混淆jar包

springboot的maven多模块如何混淆jar包 一.简介二. 示例2.1 基本配置2.2 结果 三. 错误3.1 错误13.2 错误2 四. 参考文章 前言 这是我在这个网站整理的笔记,有错误的地方请指出&#xff0c;关注我&#xff0c;接下来还会持续更新。 作者&#xff1a;神的孩子都在歌唱 一.简介 …

【node】模块化与包(二)

1、模块化的基本概念 模块化是指解决一个复杂的问题时&#xff0c;自顶向下逐层把系统划分成若干模块的过程。对于整个系统来说&#xff0c;模块是可组合、分解和更换的单元。 &#xff08;1&#xff09;模块化的优点 遵循固定规则&#xff0c;把大文件拆分成对立并相互依赖…

Python使用FastAPI提供图片缩略图生成接口

使用pillow的thumbnail生成缩略图时&#xff0c;会保持原图的宽高比&#xff1b;使用的opencv的resize则不会 具体代码如下&#xff1a; #!/usr/bin/env python import re import sys from enum import Enum from io import BytesIO from pathlib import Path from typing im…

蓝桥杯 2022 dp 背包

蓝桥杯 2022 dp 背包 题目链接&#xff1a; https://www.lanqiao.cn/problems/2186/learning/?subject_code1&group_code4&match_num13&match_flow2&origincup 题目&#xff1a; 代码&#xff1a; #include<bits/stdc.h> using namespace std;#defi…

王道机试C++第6章 数学问题和22年蓝桥杯省赛选择题Day34

6.1 进制转换 二进制数&#xff08;十转二&#xff09; 习题描述 大家都知道&#xff0c;数据在计算机里中存储是以二进制的形式存储的。 有一天&#xff0c;小明学了C语言之后&#xff0c;他想知道一个类型为unsigned int 类型的数字&#xff0c;存储在计算机中的二进制串是…

多数问题求解之蒙特卡洛与分治法

多数问题&#xff08;Majority Problem&#xff09;是一个有多种求解方法的经典问题&#xff0c;其问题定义如下&#xff1a; 给定一个大小为 n n n的数组&#xff0c;找出其中出现次数超过 n / 2 n/2 n/2的元素 例如&#xff1a;当输入数组为 [ 5 , 3 , 5 , 2 , 3 , 5 , 5 ] […

JavaEE:网络编程

网络编程&#xff1a;通过代码完成基于网络的跨主机通信 跨主机通信方式&#xff1a; 1.TCP/IP网络 2.蓝牙通信 3.近场通信NFC 4.毫米波通信&#xff1a;功率高&#xff0c;带宽高&#xff0c;抗干扰能力差 其中TCP/IP网络是日常编程中最常涉及到的&#xff0c;最通用的跨主机通…

深度学习进阶:揭秘强化学习原理,实战应用全解析!

作为机器学习领域的一大分支&#xff0c;强化学习以其独特的学习方式吸引了众多研究者和实践者的目光。强化学习&#xff0c;顾名思义&#xff0c;是通过不断地强化与环境的交互来优化决策策略。在这个过程中&#xff0c;智能体通过试错&#xff0c;根据环境给出的奖励信号来调…

Linux:导出环境变量命令export

相关阅读 Linuxhttps://blog.csdn.net/weixin_45791458/category_12234591.html?spm1001.2014.3001.5482 Linux中的内建命令export命令用于创建一个环境变量&#xff0c;或将一个普通变量导出为环境变量&#xff0c;并且在这个过程中&#xff0c;可以给该环境变量赋值。 下面…

产品测试方案:视频接入平台并发性能测试方案和报告(即150路视频并发流媒体服务器模块的性能测试方案和报告)

目 录 一、测试目的&#xff1a; 二、测试方案&#xff1a; 2.1、测试思路 2.2、拓扑图 三、测试环境 3.1 服务器配置 3.2 网络摄像机列表 3.3 测试软件 四、测试流程 4.1 H.264并发测试&#xff1a; 4.1.1老版本srsout3.10并发测试 4.1.2 新版本srsout…