JAVA开发( 腾讯云消息队列 RocketMQ使用总结 )

news2024/11/24 3:45:49

一、问题背景

      之所以需要不停的总结是因为在java开发过程中使用到中间件实在太多了,久久不用就会慢慢变得生疏,有时候一个中间很久没使用,可能经过了很多版本的迭代,使用起来又有区别。所以还是得不断总结更新。最近博主就是在使用腾讯云RocketMQ中遇到了点问题,排查了很久,也不知道什么原因,最好咨询了了腾讯官方技术支撑,最终解决。现在很多中间件都是各位巨头经过封装,然后卖给中小企业,有时候遇到点问题,还不容易在网上搜索资料排查到,都只能在巨头的生态里摸索,排查,请教。

二、RocketMQ产品概述

消息队列 RocketMQ 版(TDMQ for RocketMQ,简称 TDMQ RocketMQ 版)是腾讯云基于 Apache RocketMQ 构建的分布式消息中间件,完全兼容 Apache RocketMQ 的各个组件与概念,支持开源社区版本的客户端零改造接入。

消息队列 RocketMQ 版具有低延迟、高性能、高可靠、万亿级消息容量和灵活可扩展等特点,可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

三、RocketMQ的组成部分和基本概念

Producer 集群: 客户侧应用,负责生产并发送消息。

Consumer 集群:客户侧应用,负责订阅和消费处理消息。

Nameserver 集群: 服务端应用,负责路由寻址和 Broker 心跳注册。

心跳注册:NameServer 相当于注册中心的角色,各个角色的机器都要定时向 NameServer 上报自己的状态,如果超时未上报,NameServer 会认为某个机器出现故障不可用了,从而将这个机器从可用列表中删除。

路由寻址:每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息,生产者和消费者通过 NameServer 去获取整个Broker 集群的路由信息,从而进行消息的投递和消费。

Broker集群:服务端应用,负责接收,存储,投递消息,支持主从多副本模式,从节点可选部署,实际现网公有云上数据高可靠直接依赖云盘三副本。

管控集群: 服务端应用,可视化的管控控制台,负责运维整个集群,例如源数据的收发和管理等。

消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。

主题(Topic)

Topic 表示一类消息的集合,每个主题包含若干消息,是 RocketMQ 进行消息订阅的基本单位。

消息标签(MessageTag)

为消息设置的标签,用于将同一个 Topic 下区分不同类型的消息,可以理解为 Topic 是消息的一级分类,Tag 是消息的二级分类。

消息队列(MessageQueue)

存储消息的物理实体,一个 Topic 可以包含多个 Queue,Queue 也叫消息分区,一个 Queue 中的消息只能被一个消费者组中的一个消费者消费,一个 Queue 中的消息不允许同一个消费者组中的多个消费者同时消费。

消息位点(MessageQueueOffset)​

消息是按到达 RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的 Long 类型坐标,这个坐标被定义为消息位点。

消费位点(ConsumerOffset)​

一条消息被某个消费者消费完成后不会立即从队列中删除, RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。

消息索引(MessageKey)​

消息索引是 RocketMQ 提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。

生产者(Producer)​

生产者是 RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。

消费者(Consumer)​

消费者是 RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。

分组(Group)

可分为生产者组和消费者组:

生产者组:同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息,且生产者发送后崩溃,则 Broker 服务器会联系同一个生产者组的其他生产者实例以提交或者回溯消费。

消费者组:同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面实现了负载均衡和容错。消费者组的消费者实例必须订阅完全相同的 Topic。

消息类型(MessageType)​

按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。

普通消息

普通消息是一种基础的消息类型,由生产投递到指定 Topic 后,被订阅了该 Topic 的消费者所消费。普通消息的 Topic 中无顺序的概念,可以使用多个分区数来提升消息的生产和消费效率,在吞吐量巨大时其性能最好。

顺序消息

顺序消息是消息队列 RocketMQ 提供的一种高级消息类型,对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发送的消息先消费,后发送的消息后消费。

重试队列

重试队列是一种为了确保消息被正常消费而设计的队列。当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试队列,当重试达到一定次数后,停止重试,投递到死信队列中。

由于实际场景中,可能会存在的一些临时短暂的问题(如网络抖动,服务重启等)导致消息无法及时被处理,但短暂时间过后又恢复正常。这种场景下,重试队列的重试机制就可以很好解决此类问题。

死信队列

死信队列是一种特殊的消息队列,用于集中处理无法被正常消费的消息的队列。当消息在重试队列中达到一定重试次数后仍未能被正常消费,TDMQ 会判定这条消息在当前情况下无法被消费,将其投递至死信队列。

实际场景中,消息可能会由于持续一段时间的服务宕机,网络断连而无法被消费。这种场景下,消息不会被立刻丢弃,死信队列会对这种消息进行较为长期的持久化,用户可以在找到对应解决方案后,创建消费者订阅死信队列来完成对当时无法处理消息的处理。

集群消费

集群消费:当使用集群消费模式时,任意一条消息只需要被集群内的任意一个消费者处理即可。适用于每条消息只需要被处理一次的场景。

广播消费

广播消费:当使用广播消费模式时,每条消息会被推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。适用于每条消息需要被集群下每一个消费者处理的场景。

消息过滤​

消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在 RocketMQ 的服务端完成。

重置消费位点​

以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到 RocketMQ 服务端的消息。

消息轨迹​

在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由 RocketMQ 服务端,投递给消费者的完整链路,方便定位排查问题。

消息堆积​

生产者已经将消息发送到 RocketMQ 的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。

四、应用场景

异步解耦

交易引擎作为腾讯计费最核心的系统,每笔交易订单数据需要被几十个下游业务系统关注,包括库存系统、仓储系统、促销系统、积分系统等,多个系统对消息的处理逻辑不一致,单个系统不可能去适配每一个关联业务。此时,TDMQ RocketMQ 版可解除多个业务系统之间的耦合度,减少系统之间影响,提升核心业务响应速度和健壮性。

削峰填谷

企业不定时举办的一些营销活动,新品发布上线,节日抢红包等,往往都会带来临时性的流量洪峰,这对后端的各个应用系统考验是十分巨大的,如果直接采用扩容方式应对又会带来一定的资源浪费。RocketMQ 可以应对突发性的流量洪峰,在峰值时堆积消息,而在峰值过去后下游系统慢慢消费消息,解决上下游处理能力不匹配,提升系统可用性。

 还有等等

顺序收发

顺序消息是消息队列 RocketMQ 提供的一种高级消息类型,对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发送的消息先消费,后发送的消息后消费。顺序消息常用于以下业务场景:

订单创建场景:在一些电商系统中,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息必须严格按照先后顺序来进行生产或者消费,否则消费中传递订单状态会发生紊乱,影响业务的正常进行。因此,该订单的消息必须按照一定的顺序在客户端和消息队列中进行生产和消费,同时消息之间有先后的依赖关系,后一条消息需要依赖于前一条消息的处理结果。

日志同步场景:在有序事件处理或者数据实时增量同步的场景中,顺序消息也能发挥较大的作用,如同步 mysql 的 binlog 日志时,需要保证数据库的操作是有顺序的。

金融场景:在一些撮合交易的场景下,比如某些证券交易,在价格相同的情况下,先出价者优先处理,则需要按照FIFO的方式生产和消费顺序消息。

五、如何使用

 集成到springBoot

mq配置信息项

server:
     port: 8082


   #rocketmq配置信息
   rocketmq:
     # tdmq-rocketmq服务接入地址
     name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
     # 生产者配置
     producer:
       # 生产者组名
       group: group111
       # 角色密钥
       access-key: eyJrZXlJZC....
       # 已授权的角色名称
       secret-key: admin
     # 消费者公共配置
     consumer:
       # 角色密钥
       access-key: eyJrZXlJZC....
       # 已授权的角色名称
       secret-key: admin


     # 用户自定义配置
     namespace: MQ_INST_rocketmqxxxxxxxx
     producer1:
       topic: testdev1
     consumer1:
       group: group111
       topic: testdev1
       subExpression: TAG1
     consumer2:
       group: group222
       topic: testdev1
       subExpression: TAG2

<!-- in your <dependencies> block -->
<dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-client</artifactId>
     <version>4.9.3</version>
</dependency>


<dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-acl</artifactId>
     <version>4.9.3</version>
</dependency>

创建生产者 

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer(
     namespace, 
     groupName,
     new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) 
     // ACL权限
);
// 设置NameServer的地址
producer.setNamesrvAddr(nameserver);
// 启动Producer实例
producer.start();

发送消息

for (int i = 0; i < 10; i++) {
     // 创建消息实例,设置topic和消息内容
     Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
     // 发送消息
     SendResult sendResult = producer.send(msg);
     System.out.printf("%s%n", sendResult);
}

异步发送

// 设置发送失败后不重试
producer.setRetryTimesWhenSendAsyncFailed(0);
// 设置发送消息的数量
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
     try {
             final int index = i;
             // 创建消息实体,设置topic和消息内容
             Message msg = new Message(topic_name, "TAG", ("Hello rocketMq " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
             producer.send(msg, new SendCallback() {
                     @Override
                     public void onSuccess(SendResult sendResult) {
                             // 消息发送成功逻辑
                             countDownLatch.countDown();
                             System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                     }


                     @Override
                     public void onException(Throwable e) {
                             // 消息发送失败逻辑
                             countDownLatch.countDown();
                             System.out.printf("%-10d Exception %s %n", index, e);
                             e.printStackTrace();
                     }
             });
     } catch (Exception e) {
             e.printStackTrace();
     }
}
countDownLatch.await(5, TimeUnit.SECONDS);

创建消费者

// 实例化消费者
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(
     namespace,                                                  
     groupName,                                              
     new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限
// 设置NameServer的地址
pushConsumer.setNamesrvAddr(nameserver);

消费信息

// 实例化消费者
DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(
     namespace,                                               
     groupName,                                             
     new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));
// 设置NameServer的地址
pullConsumer.setNamesrvAddr(nameserver);
// 设置从第一个偏移量开始消费
pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

发布与订阅模式

发布到订阅

// 订阅topic
pushConsumer.subscribe(topic_name, "*");
// 注册回调实现类来处理从broker拉取回来的消息
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
     // 消息处理逻辑
     System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
     // 标记该消息已经被成功消费, 根据消费情况,返回处理状态
     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
pushConsumer.start();

订阅信息

// 订阅topic
pullConsumer.subscribe(topic_name, "*");
// 启动消费者实例
pullConsumer.start();
try {
     System.out.printf("Consumer Started.%n");
     while (true) {
             // 拉取消息
             List<MessageExt> messageExts = pullConsumer.poll();
             System.out.printf("%s%n", messageExts);
     }
} finally {
     pullConsumer.shutdown();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/714727.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

睿铂相机同步性控制技术解析

极客睿铂 前几期睿铂给大家分享了一些倾斜相机背后的技术&#xff0c;主要都是的关于镜头光学方面的。但实际上倾斜摄影相机还有很多其他关键性技术有待突破&#xff0c;任何技术的发展都不能一蹴而就&#xff0c;需要根据客户的问题反馈&#xff0c;发现新的问题并解决问题&a…

自定义MVC架构【下】

目录 一、前言 二、导出自定义MVC架包 三、使用自定义MVC架包 四、优化增删改查Dao层及Servlet 1.优化增删改查Dao层 2.优化增删改查Servlet代码 五、案例实操 1.将PageTag自定义标签进行配置 2.jsp页面环境搭建 3.案例演示 一、前言 在上篇中&#xff0c;我们已经优化…

ARM架构(寄存器点灯)

文章目录 前言一、LED原理图二、使用寄存器点灯的步骤三、如何操作寄存器四、实际操作1.使能GPIO端口2.将引脚设置为输出模式3.设置输出状态 五、全部代码总结 前言 本篇文章我们来讲解一下如何使用寄存器点亮一个LED灯&#xff0c;一般对于新人来说都是使用HAL库或者标准库来…

SpringBoot3【④ 基础特性】

1. SpringApplication 1.1. 自定义 banner 类路径添加banner.txt或设置spring.banner.location就可以定制 banner推荐网站&#xff1a;Spring Boot banner 在线生成工具&#xff0c;制作下载英文 banner.txt&#xff0c;修改替换 banner.txt 文字实现自定义&#xff0c;个性化…

操作系统面试知识点

1、进程、线程和协程的区别和联系 1、进程是资源调度的基本单位&#xff0c;运行一个可执行程序会创建一个或多个进程&#xff0c;进程就是运行起来的可执行程序 2、线程是程序执行的基本单位&#xff0c;是轻量级的进程。每个进程中都有唯一的主线程&#xff0c;且只能有一个…

机器学习第三课(sklearn接口)

一、sklearn基本知识 中文官网 英文官网 注意&#xff1a;sklearn第三方模块的安装 要用pip install scikit-learn from sklearn.neighbors import KNeighborsClassifier # 1 准备数据 # 训练集的特征数据 2维 x [[-2],[-1],[2],[3],[4]] # 训练集的目标数据 1维 y [0,0,1,…

HTML培训心得体会五篇(合集)

HTML5培训心得一 关于html5培训心得总结? 一&#xff1a;了解HTML5前端开发技术? ?? HTML?指的是超文本标记语言?(Hyper?Text?Markup?Language)&#xff0c;标记语言是一套标记标签?(markup?tag)&#xff0c;HTML?使用标记标签来描述网页。HTML5区别于HTML的标…

北京大学2015计算机学科夏令营上机考试(未完)

A:整数的个数 #include<iostream> using namespace std; int main(){int k,a;cin>>k;int sum10,sum20,sum30;for(int i0;i<k;i){cin>>a;if(a1) sum1;if(a5) sum2;if(a10) sum3;}cout<<sum1<<endl<<sum2<<endl<<sum3;retur…

quiche编译

netty http3使用了rust语言的quiche&#xff0c;quiche使用了c语言的boringssl&#xff0c; 网上没有找到编译好的quiche&#xff0c;只能自己搭建rust环境编译 1、rust安装 见官网https://www.rust-lang.org/tools/install 我是用的是windows的ubuntu&#xff0c;所以直接使…

苹果市值再度突破3万亿美元

KlipC报道&#xff1a;当地时间周五&#xff0c;苹果市值再度突破3万亿美元&#xff0c;这是近43年来&#xff0c;苹果第二次市值达到3万亿。 KlipC的合伙人Andi Duan表示&#xff1a;“得益于苹果股价上涨&#xff0c;以及硅谷银行引发的金融市场动荡&#xff0c;再加上高端IP…

【嵌入式Qt开发入门】如何创建并连接信号与槽

创建信号 我们先新建一个项目&#xff0c;命名为 signal_slot_example&#xff0c;如果还不会新建项目&#xff0c;请回到 【嵌入式Qt开发入门】初识信号与槽查看项目如何建立的。取消勾选*ui 文件&#xff0c;其他步骤不变。 由于信号只需声明&#xff0c;无需定义。所以我们只…

Mac端显示服务器上show的内容

Mac端显示服务器上show的内容 1. 需求描述 在Mac端&#xff08;终端和PyCharm中&#xff09;编写代码&#xff0c;在服务器端运行程序。需要在Mac端显示服务器端运行的内容&#xff0c;比如&#xff0c;运行的视频等。 2. 常见报错 SSH 运行命令时报错示例。 (cv) czjing…

旋转链表:给你一个链表的头节点 head ,旋转链表,将链表每个节点向右移动 k 个位置。

解题思路&#xff1a; 1.找到原链表的最后一个节点 2.计算链表长度n 3.将原链表的尾节点指向链表的头节点&#xff0c;使链表形成环 4.新链表根据画图可知&#xff0c;会在环的&#xff08;n-k%n&#xff09;的地方断开&#xff0c;这个节点newtail是新链表的尾节点 5.将新…

Ubuntu 20.04 LTS x86_64 安装 stable-diffusion-webui

官网 Stable Diffusion官网 Stability AI 官方github GitHub - Stability-AI/stablediffusion: High-Resolution Image Synthesis with Latent Diffusion Models stable-diffusion-webui github https://github.com/AUTOMATIC1111/stable-diffusion-webui 安装 下载sta…

ruoyi页面切换查询条件保留

场景描述 业务遇到需求&#xff0c;管理平台的页面打开后输入的查询条件、已经点击的页码、已经查询的数据要求保留下来&#xff0c;在tab菜单切换时保留&#xff0c;在关闭菜单时重置清空。 解决方案 1.使用cookie、localStorage或者sessionStorage 这个方式能解决部分需求…

windows10或者Ubuntu20.04内启动USB外接摄像头并拍照

1. windows10系统内启动摄像头并拍照 1.1 把带摄像头的USB接口插到电脑的USB接口上。 1.2 左下角搜索“设备管理器”&#xff0c;并点击&#xff0c;主要是确保笔记本自带的摄像头被禁用&#xff0c;和usb连接的外部摄像头被起用。 1.3 左下角搜索“相机”&#xff0c;并点击拍…

读《effective modern c++》笔记总结

文章目录 一、类型推导与auto模板类型推导ParamType是一个指针或引用&#xff0c;但不是通用引用ParamType是一个通用引用ParamType即不是指针也不是引用数组实参函数实参 auto类型推导 二、decltype的理解三、优先考虑auto而非显示类型声明四、区别使用&#xff08;&#xff0…

大华 海康 宇视 摄像头 onvif协议 调整时间 开发过程 整理

1、onvif官网 查看SetSystemDateAndTime 方法。 2、下载 ONVIF Device Test Tool 工具&#xff0c;使用教程可以 在这查看。 3、根据Test Tool 工具生成的request进行 Send request 测试。 有了这个本质就是http请求了&#xff0c;我认为可以自己写http请求尝试&#xff0c;我…

大地200C

8芯网线 【24&#xff0c;M03&#xff0c;冷却&#xff0c; m35&#xff0c;m34&#xff0c;m33&#xff0c;m32&#xff0c;24】 冷却【m08开&#xff0c;m09关】 M10夹紧M11松开 M18润滑【m127开&#xff0c;m227关】 X轴&#xff1a;5000 3.0A Y轴&#xff1…

Mybatis-Plus学习4 Page分页

ctrl P 查看可填的属性类型 alt 回车 自动填充数据类型 1、使用Page分页需要先配置config类&#xff0c;加上拦截器 Configuration MapperScan("com/learn/mybatisplus/mapper") public class MybatisPlusConfig {Beanpublic MybatisPlusInterceptor mybatisP…