概述
RocketMQ是阿里开发的基于JMS的分布式消息中间件,是由纯JAVA实现的。放弃了比较重量级的Zookeeper作为注册中心,使用自研的NameServer实现元数据的管理,支持事务消息,顺序消息,批量消息,定时消息以及消息回溯。
- 支持发布订阅和点对点的两种消息模型
- 支持pull和push两种消息模式
- 单个队列可以支持百万的消息堆积能力
- 支持事务消息,重复消费,支持指定次数和时间间隔的失败消息重发
基本架构以及概念
RocketMQ使用轻量级的自研NameServer作为注册中心进行服务的协调工作,Producer, Consumer, Broker集群在启动之后都会将自身信息注册到NameServer中,三者都会与NameServer建立长连接,通过定时轮询的方式获取服务最新信息。
NameServer相对于Kafka中的Zookeeper是一个非常简单的Topic路由注册中心,NameServer完全是无状态的,可以使用集群进行部署,而NameServer Cluster中的各个NameServer节点相互不会同步任何信息。NameServer会存储Topic和Broker的信息。
- Broker启动时会向所有的NameServer进行注册,NameServer和各个Broker之间保持长连接,Broker每隔30s向NameServer发送心跳信息,而NameServer每隔10s会轮询检查Broker是否存活。Broker宕机之后,会从路由表中删除,但是Producer不会马上感知,NameServer自身有一套完整的容错机制。
- Producer启动之后是随机选择一个NameServer进行长连接,获取路由信息,并与提供Topic服务的Master节点建立长连接
- Consumer启动之后同样是随机选择一个NameServer进行长连接,获取路由信息,与提供Topic服务的Master/Slave节点建立长连接
在上图中NameServer集群中有3个NameServer节点;Producer集群有两个群组,每个群组中有两个Producer节点,Consumer配置类似。RocketMQ集群有两组Broker,分别都有主从之分,而且分别和Topic A, Topic B进行绑定,例如, BrokerA Master表示此Broker负责存储Topic 数据并是Master主节点。Producer从NameServer获取broker信息之后,选择该Topic绑定的Master进行发送数据;Consumer从NameServer获取broker信息之后,既可以绑定Master节点也可以绑定Slave节点,具体的规则由broker决定。这里和Kafka Consumer不太一样, Kafka不论Producer还是Consumer都只能绑定Master节点。
以下是RocketMQ部分概念解析
- Producer:消息生产者
- Producer Group:消息生产者群组
- Consumer:消息消费者
- Consumer Group: 消费者群组,包含多个消费者,同组消费者消费同一个Topic下不同的分区的消息。
- Broker: RocketMQ实例,可以理解为不同的RocketMQ服务器,每个都有一个唯一的编号。
- Message: 生产者传递给消费者的消息体
- Topic: 消息的主题,Broker上可以新建不同的Topic, Message发送到不同的Topic供消费者消费。
- Tag:标签,相当于一个子Topic,可以区分同一主题下不同业务的消息
- Queue: Topic和Queue是一对多的关系,主要用于负载均衡
- Offset: 持久化的时候在每个Topic下的每个Queue会生成一个消息的索引文件,Offset记录当前Queue中的消息总数
- NameServer:消息注册中心,管理两部分信息:1.Topic和Queue的路由配置信息 2.Broker的实时配置信息
具体工作流程
RocketMQ同样采用顺序IO的方式将所有Topic的消息都写入同一个文件中,这个文件就是CommitLog.由于不同的topic消息都会混淆在一起,而Consumer又是按照topic来消费消息的,这样的话势必会去遍历commitLog文件来过滤topic,这样性能肯定会非常差,所以rocketMq采用ConsumeQueue来提高消费性能。即每个Topic下的每个queueId对应一个Consumequeue,其中存储了单条消息对应在commitLog文件中的物理偏移量offset,消息大小size,消息Tag的hash值。
Producer使用轮询的方式分别向每个Queue中发送消息。
Consumer会采用负载均衡角度,为每个Consumer Group下的所有Consumer平均分配需要处理的Queue。可以利用这一机制,实现客户端的动态扩容。
当queue 个数大于 Consumer个数, 那么 Consumer 会平均分配 queue,不够平均,会根据clientId排序来拿取余数queue个数小于Consumer个数,那么会有Consumer闲置,就是浪费掉了,其余Consumer平均分配到queue。
消息的刷盘机制同样有两种:
同步刷盘:当消息持久化完成之后,Broker才会返回给Producer一个Ack。
异步刷盘:只要消息写入PageCache即可将成功的ACK返回给Producer端。
集群搭建
本文采用docker-compose部署RocketMQ集群以及UI页面,docker版本:18.06.3-ce docker-compose版本:1.24.1。下图中的10.232.112.13为宿主机的IP,注意需要替换
1. 在/usr/local/server/rocketmq_cluster/conf1
创建broker.conf
配置文件
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = ItmentuCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
2. 在/usr/local/server/rocketmq_cluster/conf2
创建broker.conf
配置文件
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = ItmentuCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-b
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
————————————————
版权声明:本文为CSDN博主「拥有1024的蜡笔小新」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/scmagic/article/details/124771179
3.docker-compose文件建立集群
version: '3'
services:
#Service for nameserver
namesrv:
image: rocketmqinc/rocketmq:4.4.0
container_name: cluster_rmqnamesrv
ports:
- 9876:9876
volumes:
- /usr/local/server/rocketmq_cluster/data_namesrv/namesrv/logs:/root/logs
- /usr/local/server/rocketmq/data_namesrv/namesrv/store:/root/store
command: sh mqnamesrv
broker1:
image: rocketmqinc/rocketmq:4.4.0
container_name: cluster_rmqbroker1
links:
- namesrv
ports:
- 10909:10909
- 10911:10911
- 10912:10912
environment:
- NAMESRV_ADDR=namesrv:9876
volumes:
- /usr/local/server/rocketmq_cluster/data1/broker/logs:/root/logs
- /usr/local/server/rocketmq_cluster/data1/broker/store:/root/store
- /usr/local/server/rocketmq_cluster/conf1/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
command: sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
depends_on:
- namesrv
broker2:
image: rocketmqinc/rocketmq:4.4.0
container_name: cluster_rmqbroker2
links:
- namesrv
ports:
- 10929:10909
- 10931:10911
- 10932:10912
environment:
- NAMESRV_ADDR=namesrv:9876
volumes:
- /usr/local/server/rocketmq_cluster/data2/broker/logs:/root/logs
- /usr/local/server/rocketmq_cluster/data2/broker/store:/root/store
- /usr/local/server/rocketmq_cluster/conf2/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
command: sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
depends_on:
- namesrv
rmqconsole:
image: pangliang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 7001:7001
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=10.232.112.13:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false -Dserver.port=7001"
depends_on:
- namesrv
Demo代码(待添加)
到这里,你了解RocketMQ了吗