RocketMQ实践与原理分析(Docker安装RocketMQ)

news2024/12/23 15:26:17

前言

QBM之前使用的消息中间件是ActiveMQ,后续需要升级为RocketMQ。

MQ广泛应用于很多业务场景中,主要的作用

  • 异步解耦
  • 削峰

常用MQ中间件对比,参考官方文档:https://rocketmq.apache.org/zh/docs/4.x/introduction/03whatis

协议和特点消息有序性定时消息批量消息广播消息消息过滤服务器触发的重新投递消息存储
ActiveMQPush model, support OpenWire, STOMP, AMQP, MQTT, JMSExclusive (独自)Consumer or Exclusive Queues can ensure orderingSupportedNot SupporedSupportedSupportedNot SupportedSupports very fast persistence using JDBC along with a high performance journal,such as levelDB, kahaDB
KafkaPull model, support TCPEnsure ordering of messages within a partitionNot SupportedSupported, with async producerNot SupportedSupported, you can use Kafka Streams to filter messagesNot SupportedHigh performance file storage
RocketMQPull model, support TCP, JMS, OpenMessagingEnsure strict ordering of messages,and can scale out gracefullySupportedSupported, with sync mode to avoid message lossSupportedSupported, property filter expressions based on SQL92SupportedHigh performance and low latency file storage

通过学习并结合业务知识,重点思考的问题:

  • 顺序性消费?顺序消费场景某个消息失败导致消息挤压?
  • 消息的挤压?如何根据业务划分topic和tag?相同tag分group?,同一业务消息有相同的key?
  • 消息消费的多线程问题?某个业务场景比如财务需要单线程消费?

目录

  • 概述
  • 实践
  • 原理分析

一、概述

RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

基于最基础的发布订阅模型,而在实际的应用中,结构会更复杂。例如为了支持高并发和水平扩展,中间的消息主题需要进行分区(对应Message Queue),同一个Topic会有多个生产者,同一个信息会有多个消费者,消费者之间要进行负载均衡等。
image.png

ps:存储消息Topic的 代理服务器( Broker ),是实际部署过程对应的代理服务器。

核心概念

  • Group:一类生产者或者消费者,每个包含GroupID
  • Producer:消息发布者,RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。
  • Consumer:消息订阅者,从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费
  • Message Queue:每个Topic下会由一个到多个队列来存储消息,RocketMQ 对 Topic 进行了分区,这种操作被称为队列(MessageQueue)。
    • 死信队列:用于处理无法被正常消息的消息,当一条信息初次消费失败,消息队列RocketMQ会自动进行消息重试,达到重试最大次数仍然失败的话,若消费仍然失败,该消息不会被丢弃,而是直接发到设置的该Consumer对应的死信队列里面。
  • Topic:消息主题
  • Message:生产者向Topic发送并最终传给消费者的数据消息载体,生产者为消息定义的属性成为消息属性,包含Message Key和Tag,MQ本身会生成一个Message ID。
    • Message Key:消息的业务标识,由消息生产者设置,唯一标识某个业务逻辑。
    • Message ID:消息的全局唯一标识,由RocketMQ系统自动生成。
    • Tag:消息标签,Topic下的进一步区分。
  • Topic Partition:分区,物理上的概念,每个Topic包含一个或者多个分区。
  • 消费位点:每个Topic会有很多分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset,分区开始的消费位点为MinOffset。
    • 重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内(默认3天),重新设置Consumer对已订阅Topic的消费进度,设置完成后Consumer将接受设定时间点之后由Producer发送到消息队列服务端的消息。

其他消息类型相关概念

  • 事务性消息:Exactly-Once:Consumer消费一次仅能消费一次。
  • 集群消息:相同的ConsumerGroup下的消息消费,每个Consumer按照GroupID均分消费。
  • 广播消息:相同的ConsumerGroup下的消息消费,每个Consumer按照GroupID全量消费。
  • 定时消息:Producer将消息发送到RocketMQ服务端,不期望消息被马上投递,而是推迟到当前时间点之后某一个时间点才发给Consumer。
  • 延时消息:Producer将消息发送到RocketMQ服务端,不期望消息被马上投递,而是推迟一定时间才发给Consumer。
  • 事务消息:类似X/Open XA的分布式事务功能。
  • 顺序消息:一种按照顺序进行发布和消费的消息模型,分为全局顺序消息和分区顺序消息。
    • 全局顺序消息:对于指定的一个Topic,所有的消息按照严格的FIFO的顺序进行发布和消费。
    • 分区顺序消息:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一分区内的消息按照严格的FIFO顺序进行发布的消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Message Key是完全不同的概念。

其他消息相关概念

  • 消息堆积:消息被堆积在了RocketMQ的服务端,Consumer没有能力消费来不及消费。
  • 消息过滤:Consumer可以根据Tag对消息进行过滤,确保Consumer最终只能接受到被过滤后的消息,过滤操作是在服务端完成的。
  • 消息轨迹:消息从生产到消费过程的链路追钟,方便定位排查问题。

参考:

  • https://rocketmq.apache.org/docs/quickStart/01quickstart
  • https://developer.aliyun.com/article/780968

二、实践

安装RocketMQ参考:https://rocketmq.apache.org/docs/quickStart/01quickstart

容器安装RocketMQ,需要分开安装Nameserver容器和Broker容器以及控制台Console容器,其中Nameserver和Broker的连接通过broker.conf

这样做是为了解耦和方便管理:https://juejin.cn/post/7218438764100108325

开发测试直接使用docker安装

# 拉取镜像
docker pull rocketmqinc/rocketmq

# 一、启动NameServer容器,创建一个新的容器并指定 RocketMQ 的镜像
docker run -d \
--name rmqnamesrv \
-p 9876:9876 \
-v /home/docker/mydata/rocketmq/conf:/root/config \
-v /home/docker/mydata/rocketmq/logs:/root/logs \
-e "JAVA_OPTS=-Duser.home=/opt" \
rocketmqinc/rocketmq \
sh mqnamesrv 

# 参数说明:
-d 以守护线程方式启动
--name rmqnamesrv 设置容器名称
-p 9876:9876 端口映射
-v 把容器内的/root/logs日志路径挂载到宿主机的自定义路径中(需根据自己的路径自行创建)
-v 把容器内的/root/store数据存储目录挂载到宿主机的自定义目录(需根据自己的路径自行创建)
rocketmqinc/rocketmq 使用镜像的名称
sh mqnamesrv 执行name server脚本


# 进入容器
docker exec -it d60b /bin/bash

# 修改broker.conf文件,设置通信的brokerIP
vi ... /conf/broker.conf,然后添加brokerIP1 = xxx.xxx.xxx.xxx,内容为宿主机的IP

# broker.conf的其他配置项

# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH

# 回到宿主机,将broker.conf拷贝到宿主机
# nameserver容器内配置文件/opt/rocketmq-4.4.0/conf

docker cp d60b:/opt/rocketmq-4.4.0/conf/broker.conf /home/docker/mydata/rocketmq/conf/broker.conf






# 二、启动Broker容器

docker run -d  \
--name rmqbroker \
--link rmqnamesrv:namesrv \
-p 10911:10911 \
-p 10909:10909 \
-v  /home/docker/mydata/rocketmq/broker/logs:/root/logs \
-v  /home/docker/mydata/rocketmq/broker/store:/root/store \
-v /home/docker/mydata/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "MAX_POSSIBLE_HEAP=200000000" \
rocketmqinc/rocketmq \
sh mqbroker -c  ../conf/broker.conf 


# 参数说明
--link rmqnamesrv:namesrv  和rmqnamesrv容器通信
-p 10911:10911 把容器的非vip通道端口挂载到宿主机
-p 10909:10909 把容器的vip通道端口挂载到宿主机
-e “NAMESRV_ADDR=namesrv:9876”  指定namesrv的地址为本机namesrv的ip地址:9876
-e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker 指定broker服务的最大堆内存(暂未配置)
sh mqbroker -c  ../conf/broker.conf  读取../conf/broker.conf配置并启动broker


# 三、安装控制台
docker pull styletang/rocketmq-console-ng


docker run -d \
-p 8081:8080 \
-e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=120.46.82.131:9876 -Drocketmq.config.isVIPChannel=false" \
styletang/rocketmq-console-ng

# 四、访问控制台(别忘了开8081防火墙)
xxx.xxx.xxx.xxx:8081

三、原理分析

参卡:官网文档:https://rocketmq.apache.org/zh/docs/4.x/introduction/03whatis

3.1、RocketMQ部署模型

Producer、Consumer又是如何找到Topic和Broker的地址呢?消息的具体发送和接收又是怎么进行的呢?

RocketMQ部署架构上主要分为四个部分
image.png

  • 生产者Producer:通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。
  • 消费者Consumer:消费消息角色,支持推Push、拉Pull两种模式对消息消费,同时也支持集群方式和广播方式的消费。
  • 域名服务器NameServer:一个简单的Topic路由注册中心,支持Topic、Broker的动态注册与发现。一般集群部署,各实例间相互不进行信息通讯,集群中的每个NameServer都全量的保存完整的路由信息,某个NameServer下线也不影响可用性。主要包括两个功能
    • Broker管理:接受Broker集群的注册信息并保存下来作为路由信息,然后提供心跳检测机制,检测Broker是否还存活。
    • 路由信息管理:保存关于Broker集群的整个路由信息 和 用于客户端查询的队列信息。
  • **代理服务器Broker:**主要负责消息的存储、投递和查询以及服务高可用保证。因为各个Broker中的信息不一样,不能简单像NameServer那样直接集群部署,而是需要采取主从模式集群架构。Broker采取Master-Slave结构,通过指定相同的BrokerName、不同的BrokerId来区分主(BrokerId = 0)、从(BrokerId = 1)

小结

  • Broker注册:每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
  • Producer注册:Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
  • Consumer注册:Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。

// TODO

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1015791.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android查看公钥与MD5

参考:填写App特征信息_备案-阿里云帮助中心 安卓应用获取App特征信息指导 包名、公钥和签名MD5获取方式有多种,本文以使用JadxGUI工具获取为例。 下载JadxGUI工具:GitHub - skylot/jadx: Dex to Java decompiler下载安装完成后,使…

饲料添加剂 微生物 屎肠球菌

声明 本文是学习GB 7300.503-2023 饲料添加剂 第5部分:微生物 屎肠球菌. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本文件规定了饲料添加剂屎肠球菌的技术要求、采样、检验规则、标签、包装、运输、贮存和保质 期&#xff0…

Routing路径系列数学建模(TSP+CVRP)

1.Traveling Salesperson Problem(TSP) 参考:维基百科TSP 给定一些城市和城市之间的距离,找到最短路径,经过每个城市最后返回起点,组合优化问题中属于NP-hard难度。对于TSP问题有两类混合整数规划模型:Miller–Tucker…

PYTHON-模拟练习题目集合

🌈write in front🌈 🧸大家好,我是Aileen🧸.希望你看完之后,能对你有所帮助,不足请指正!共同学习交流. 🆔本文由Aileen_0v0🧸 原创 CSDN首发🐒 如…

QScrollBar滚动条、QSlider滑块、 QDial表盘

QAbstractSlider 类、 QSCrollBar 类、 QSlider 类 一、 基本原理 1、 QAbstractSlider 继承自 QWidget,该类主要用于提供一个范围内的整数值, 2、 QAbstractSlider 类是 QScrollBar 类(滚动条)、 QSlider 类(滑块)、 QDial 类(表盘)的父类,因…

智能公厕大脑,提高城市公共厕所管理效率!

随着城市建设的不断发展,公共设施的完善也成为人们关注的重要问题之一。而城市公共厕所作为城市治理的一部分,管理效率和运营成本则直接关系到城市环境整体卫生和市民的生活质量。如何提高城市公共厕所管理能力和服务水平,成为城市治理和市民…

MQ的初步了解

目录 什么是MQ? 为什么要用MQ(MQ的优点)? MQ的缺点 常用的MQ产品 MQ使用中的常见问题 什么是MQ? 【1】MQ:MessageQueue,消息队列。 队列,是一种FIFO 先进先出的数据结构。消息由…

28335 GPIO作为输入的配置记录

28335 GPIO配置为输入,可以启动输入滤波功能,看了网上很多的讲解,把滤波配置记录一下: 主要是配置两个参数: GpioCtrlRegs.GPXCTRL.bit.QUALPRDX :用于配置采样的周期,由配置值和SYSCLKOUT共同…

Java面试题之——异常和错误

提示:解释Java中的异常和错误是什么,以及它们之间的区别是什么? 文章目录 前言从定义上来说:从处理方式来看:总结⭐️ 好书推荐 前言 提示:这里可以添加本文要记录的大概内容: 在Java编程语言…

PostgreSQL 如果想知道表中某个条件查询条件在索引中效率 ?

开头还是介绍一下群,如果感兴趣PolarDB ,MongoDB ,MySQL ,PostgreSQL ,SQL Server,Redis ,Oracle ,Oceanbase 等有问题,有需求都可以加群群内有各大数据库行业大咖,CTO,可以解决你的问题。加群请加微信号 liuaustin3 &…

NSS [HNCTF 2022 Week1]Challenge__rce

NSS [HNCTF 2022 Week1]Challenge__rce hint:灵感来源于ctfshow吃瓜杯Y4大佬的题 开题&#xff0c;界面没东西&#xff0c;源码里面有注释&#xff0c;GET传参?hint 传参后返回了源码 <?php error_reporting(0); if (isset($_GET[hint])) {highlight_file(__FILE__); }…

如何判断linux 文件(或lib)是由uclibc还是glibc编译出来的?

工作中使用的编译环境有2套编译器&#xff0c;一个是glibc&#xff0c;一个是uclibc。 有些项目使用的glibc编译的lib&#xff0c;和使用uclibc编译的工程&#xff0c;在一起就会出现reference的编译错误如下&#xff1a; 那和如何来判断一个文件是由哪个编译器编译的呢&#…

苹果cms大橙子vfed 5.0去授权完美破解主题模板

大橙模版算是在苹果 cms 众多主题里&#xff0c;较为亮眼的一款了&#xff0c;主题简洁&#xff0c;功能众多&#xff0c;非常的齐全。 今天分享的就是大橙 5.0 版本模板&#xff0c;自定义菜单输入下列代码使用主题设置和资源采集。 vfed 主题设置&#xff0c;/index.php/la…

MiniApp Dev 6

商城、会员、积分

通过 API 使用 React

&#x1f3ac; 岸边的风&#xff1a;个人主页 &#x1f525; 个人专栏 :《 VUE 》 《 javaScript 》 ⛺️ 生活的理想&#xff0c;就是为了理想的生活 ! 若使用相同的 Hello World! 应用 &#xff08;通过 React 生成并通过 Visual Studio Code 更新的应用&#xff09;&#x…

微信小程序开发--4.3订阅消息

首先在微信公众平台登录相应的微信小程序&#xff0c;左侧导航栏找到功能&#xff0c;点进去订阅消息&#xff0c;点击开通&#xff0c;点击选用&#xff0c;在公共模板库中选用订阅消息的模板。 js wx.requestSubscribeMessage({tmplIds:[aDRNef2_ty37dXyqVXyUADSyO8BXOZRWYi…

手撕双链表

> 作者简介&#xff1a;დ旧言~&#xff0c;目前大一&#xff0c;现在学习Java&#xff0c;c&#xff0c;c&#xff0c;Python等 > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 望小伙伴们点赞&#x1f44d;收藏✨加关注哟&#x1f495;&#x1…

3D视觉到三维视觉之结构光

3D视觉是计算机视觉的终极体现形式 2D视觉技术主要在二维空间下完成工作&#xff0c;三维信息基本上没有得到任何利用&#xff0c;而三维信息才真正能够反映物体和环境的状态&#xff0c;也更接近人类的感知模式。近年来&#xff0c;学术界和工业界推出了一系列优秀的算法和产…

【【萌新的RISC-V学习之再看计算机组成与设计大收获总六】】

萌新的RISC-V学习之再看计算机组成与设计大收获总六 我们在进行设计的时候首先要明白一点 就是 确定我们的设计所需要的 指令的大小和 地址的大小 指令集是32位的 而 地址则一般更多的是64位 数据也是64位 PC与指令寄存器之间的关系 PC是用来保存当前指令的地址。假设地址是0…

Huggingface遇到 Couldn‘t reach xxx on the Hub (ConnectionError)解决方法

文章目录 遇到的问题解决方法参考 遇到的问题 使用服务器下载Huggingface的数据集&#xff0c;显示ConnectionError: Couldn’t reach ‘Salesforce/dialogstudio’ on the Hub (ConnectionError) 具体代码如下&#xff1a; dataset load_dataset("Salesforce/dialogs…