一起学编程,让生活更随和!
如果你觉得是个同道中人,欢迎关注博主gzh:【随和的皮蛋桑】。
专注于Java基础、进阶、面试以及计算机基础知识分享🐳。偶尔认知思考、日常水文🐌。
目录
- 一、RocketMQ 介绍
- 1、RocketMQ是什么?
- 2、MQ 的应用场景
- 2.1、异步任务处理
- 2.2、应用程序解耦合
- 2.3、日志收集
- 3、消息队列技术选型对比
- 二、RocketMQ 安装和基础使用
- 1、RocketMQ组成结构
- 1.1、交互过程
- 1.2、集群作用
- 2、RocketMQ安装
- 2.1、本地Windows安装
- 2.1.1、环境要求
- 2.1.2、下载
- 2.1.3、环境搭建
- 2.2.4、测试
- 2.2、云服务器Linux安装
- 2.2.1、环境要求
- 2.2.2、环境搭建
- 2.2.3、测试
- 3、安装可视化控制台
一、RocketMQ 介绍
1、RocketMQ是什么?
RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力。(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)。并且其内部通过Java语言开发,便于阅读与修改。
RocketMQ官: http://rocketmq.apache.org/
2、MQ 的应用场景
MQ全称为Message Queue,即消息队列, 开发中消息队列通常有如下应用场景:
2.1、异步任务处理
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
传统方式:
改造后,交互流程如下:
- 订单服务发消息到消息队列。
- 消息队列将消息发给仓储服务和物流服务。
- 仓储服务和物流服务接收到消息进行业务处理。
2.2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
举例:上图中,消息队列将订单服务和仓储服务进行解耦合,将订单服务和物流服务进行解耦合。
2.3、日志收集
进行统一业务日志收集,供分析系统进行数据分析,消息队列作为日志数据的中转站。
交互流程如下:
1、采集系统从log日志文件采集数据,发送至消息队列 。
2、各日志需求服务从消息队列 接收消息进行日志处理。
3、消息队列技术选型对比
市场上还有哪些消息队列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,Redis等
本项目选用RocketMQ的一个主要原因如下 :
- 支持事务消息
- 支持延迟消息
- 天然支持集群、负载均衡
- 支持指定次数和时间间隔的失败消息重发
详细的技术选型对比如下:
RabbitMQ:
优点:
1.支持AMQP协议
2.基于erlang语言开发,高并发性能较好
3.工作模式较为灵活
4.支持延迟消息
5.提供较为友好的后台管理页面
6.单机部署,1~2WTPS
缺点:
1.不支持水平扩容
2.不支持事务
3.消息吞吐量三者最差
4.当产生消息堆积,性能下降明显
5.消息重发机制需要手动设置
6.不支持消息重复消费
RocketMQ:
优点:
1.高可用,高吞吐量,海量消息堆积,低延迟性能上,都表现出色
2.api与架构设计更加贴切业务场景
3.支持顺序消息
4.支持事务消息
5.支持消息过滤
6.支持重复消费
7.支持延迟消息
8.支持消息跟踪
9.天然支持集群、负载均衡
10.支持指定次数和时间间隔的失败消息重发
11.单机部署,5~10WTPS
缺点:
1.生态圈相较Kafka有所不如
2.消息吞吐量与消息堆积能力也不如Kafka
3.不支持主从自动切换
4.只支持Java
Kafka:
优点:
1.高可用,高吞吐量,低延迟性能上,都表现出色
2.使用人数多,技术生态圈完善
3.支持顺序消息
4.支持多种客户端
5.支持重复消费
缺点:
1.依赖分区,消费者数量受限于分区数
2.单机消息过多时,性能下降明显
3.不支持事务消息
4.不支持指定次数和时间间隔的失败消息重发
二、RocketMQ 安装和基础使用
1、RocketMQ组成结构
1.1、交互过程
- Brokder定时发送自身状态 到NameServer。
- Producer 请求NameServer获取Broker的地址。
- Producer 将消息发送到Broker中的消息队列。
- Consumer订阅Broker中的消息队列,通过拉取消息,或由Broker将消息推送至Consumer。
1.2、集群作用
1)Producer Cluster 消息生产者集群
- 负责发送消息,一般由业务系统负责产生消息。
2)Consumer Cluster 消费者集群
- 负责消费消息,一般是后台系统负责异步消费。
- 两种消费模式:
- Push Consumer,服务端向消费者端推送消息
- Pull Consumer,消费者端向服务定时拉取消息
3)NameServer 名称服务器
- 集群架构中的组织协调员,相当于注册中心,收集 broker的工作情况,不负责消息的处理
4)Broker 消息服务器
- 是 RocketMQ的核心,负责消息的接受,存储,发送等。
- 需要定时发送自身状态 到NameServer,默认10秒发送一次,超时2分钟会认为该broker失效。
2、RocketMQ安装
RocketMQ安装:本地Windows版本和云服务器Linux版本。
2.1、本地Windows安装
2.1.1、环境要求
- 64位JDK 1.8+;
- Maven 3.2.x;
- 64位操作系统。
2.1.2、下载
官网下载地址:https://archive.apache.org/dist/rocketmq/
根据需要下载对应的版本,本文介绍采用:4.5.0
下载后解压到一个没有空格和中文的目录。
2.1.3、环境搭建
maven 采用3.2.x 及以上版本均可(自行找教程下载安装即可)。
1)参数配置
开发环境不需要太多的内存,这里调小一点。
Broker默认磁盘空间利用率达到85%就不再接收,这里在开发环境可以提高磁盘空间利用率报警阀值为98%。
调整默认的内存大小参数,切换到bin
目录下,编辑runserver.cmd
:
cd bin/
vim runserver.cmd
修改为如下:
set "JAVA_OPT=%JAVA_OPT% ‐server ‐Xms512m ‐Xmx512m ‐Xmn512m ‐XX:MetaspaceSize=128m ‐XX:MaxMetaspaceSize=320m"
bin
目录下,编辑修改runbroker.cmd
:
cd bin/
vim runbroker.cmd
修改为如下:
set "JAVA_OPT=%JAVA_OPT% ‐server ‐Drocketmq.broker.diskSpaceWarningLevelRatio=0.98 ‐Xms512m ‐Xmx512m ‐Xmn512m"
指定broker
的命名空间地址,编辑broker.conf
:
默认情况下,namesrvAddr
为127.0.0.1:9876
。
2)配置环境变量
路径只到解压目录即可:
3)启动NameServer
打开cmd窗口,切换到bin
目录:
d:
cd D:\Software\MQ\rocketmq-all-4.5.0-bin-release\bin
start mqnamesrv.cmd
4)启动Broker
打开cmd窗口,切换到bin
目录下:
# -n:指定NameServer的地址
start mqbroker.cmd ‐n 127.0.0.1:9876 autoCreateTopicEnable=true
此窗口也不要关闭。
2.2.4、测试
bin
目录下专门提供了一个tools.cmd
工具类,可以测试namesrv
和broker
是否真正启动成功!
1)打开cmd窗口【发送消息】
set NAMESRV_ADDR=127.0.0.1:9876
cd D:\Software\MQ\rocketmq-all-4.5.0-bin-release\bin
# 启动生产者
tools.cmd org.apache.rocketmq.example.quickstart.Producer
如果启动报错,仔细检查启动namesrv
和broker
的参数配置,以及tools.cmd
的参数配置!!!
正确启动如下:
2)打开cmd窗口【接收消息】
set NAMESRV_ADDR=127.0.0.1:9876
cd D:\Software\MQ\rocketmq-all-4.5.0-bin-release\bin
# 启动消费者
tools.cmd org.apache.rocketmq.example.quickstart.Consumer
windows下安装和测试完成!!!
2.2、云服务器Linux安装
2.2.1、环境要求
下载地址和环境要求和上述要求大致保持一致!!!
2.2.2、环境搭建
前提是要拥有一个云服务器,我购买的是阿里云轻量级服务器,自己玩够用了,当然如果没条件也可以自己在本地使用VMware一样效果。
如果想购买云服务器,传送门:https://blog.csdn.net/qq_52596258/article/details/121290659
1)使用远程链接工具(Tabby)和SFTP工具将其上传到服务器上:
2)解压
# 解压
unzip rocketmq-all-4.5.0-bin-release.zip
# 将解压包,移动到指定路径下
mv rocketmq-all-4.5.0-bin-release ../software
3)参数配置
runserver.sh
runbroker.sh
tools.sh
上述配置文件的jvm参数和windows下的配置保持一致即可!!!
云服务器需要在broker.conf
文件最下面添加如下配置:
brokerIP = 服务器公网IP
# eg
brokerIP = 106.15.0.30
4)操作指令【bin目录下】
# 启动 nameserver
nohup sh mqnamesrv -n 106.15.0.30:9876 &
# 启动 broker
nohup sh mqbroker -n 106.15.0.30:9876 -c ../conf/broker.conf autoCreateTopicEnable=true &
# 查看进程
jps -l
# 关闭服务
sh mqshutdown namesrv
sh mqshutdown broker
2.2.3、测试
1)启动Producer【发送消息】
# 生产者
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
2)启动Consumer【接收消息】
# 消费者
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
Linux下搭建成功!
3、安装可视化控制台
1)下载
RocketMQ提供了UI管理工具,名为rocketmq-console
,项目地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
这个是rocketmq的扩展,里面不仅包含控制台的扩展,也包含对大数据flume、hbase等组件的对接和扩展。
2)上传解压
为了演示服务器版搭建,我直接下载上传至云服务器:
# 解压
unzip rocketmq-console.zip
# 移动到software目录下
mv rocketmq-console ../software
3)修改配置参数
修改rocketmq-console\src\main\resources\application.properties
配置文件
4)打包
进入rocketmq-console目录下
打开cmd窗口执行:
mvn clean package ‐Dmaven.test.skip=true
5)启动
进入/rocketmq-console/target
目录下执行:
# 指定端口号和命名空间地址
java -jar rocketmq-console-ng-1.0.1.jar --server.port=9877 --rocketmq.config.namesrvAddr=106.15.0.30:9876
虽然console
启动成功,想控制台不报错误日志,必须先启动namesrv
和broker
,让console
连接上命名空间和broker
。
6)访问
http://106.15.0.30:9877
如果无法访问,很有可能是服务器的防火墙没有开放端口号,步骤如下:
# 查看防火墙状态
systemctl status firewalld
# 关闭防火墙
systemctl stop firewalld
# 启动防火墙
systemctl start firewalld
# 永久开放指定端口号【把用到的端口号都开放】
firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --zone=public --add-port=9877/tcp --permanent
# 重新加载防火墙
firewall-cmd --reload
# 或者重启防火墙
systemctl restart firewalld.service
# 查看防火墙信息列表
firewall-cmd --list-all
# 只查看防火墙开放端口号列表
firewall-cmd --list-ports
防火墙端口号开放之后,同时也要在ECS服务器安全组中添加端口规则:
入方向和出方向最好都添加上。
再次访问:http://106.15.0.30:9877
如果还不行,那这个时候不要着急,着急也没有用,我们先来理下思路,
- 防火墙端口号打开了
- 安全组规则添加了
还访问不了,是不是还有防火墙,果然上网搜索,发现阿里云服务器还有一层防火墙iptables
,应该是默认配置的,如何关闭或者在这层防火墙也开放端口号,步骤如下:
# 查看防火墙状态出现的问题
service iptables status
# 关闭防火墙
service iptables stop
如果上述命令报错,可能是没有安装iptable
,但是默认确实有这一层防火墙的。
解决办法:打开阿里云进入轻量级服务器界面,左边菜单栏有一个防火墙tab,这个代表的就是iptable
,直接在这添加端口开放规则
再次访问:http://106.15.0.30:9877
大功告成,访问成功,撒花!!!