目录
消息队列的作用
消息队列的优势
应用解耦
异步提速
削峰填谷
RocketMQ介绍
RocketMQ特点
RocketMQ安装下载(4.9.5版本)
RocketMQ启动可视化管理服务
RocketMQ实现基本消息收发
消息队列的作用
队列是一种FIFO先进先出的数据结构。消息则是跨进程传递的数据。一个典型的MQ系统,会将消息消息由生产者发送到MQ进行排队,然后根据一定的顺序交由消息的消费者进行处理。
消息队列的优势
应用解耦
假如用户访问订单系统,而订单系统跟其他系统是强耦合的,如图如果库存系统挂了,那么整个订单系统也都不能用了。如果这种情况还想要增加新的XX系统进来,那么就只能修改源代码来完成。系统的耦合性越高,容错性就越低,可维护性就越低。
通过引入MQ做到应用解耦,库存系统出现异常可以等库存系统恢复后去MQ中拿消息,此时不影响别的系统调用,如果还要加入新的系统比如XX系统,那么只需XX系统去MQ中拿取消息进行处理即可。使用MQ可以提升容错性和可维护性。
异步提速
原先用户请求订单系统,需要等到订单系顺序调用其他系统无误后返回,比较耗时。
现在通过引入MQ,订单系统只需要把信息发送到MQ中即可,相当于完成了之前顺序请求其他系统的步骤,时间成本大大减低。
削峰填谷
以上场景中激增请求会打垮系统,造成服务不可用。
通过将激增请求先放到MQ当前,然后系统再根据自身情况拉取请求来消费
RocketMQ介绍
RocketMQ是阿⾥巴巴开源的⼀个消息中间件,在阿⾥内部历经了双⼗⼀等很多⾼并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的⼀个顶级项⽬。
早期阿⾥使⽤ActiveMQ,但是,当消息开始逐渐增多后,ActiveMQ的IO性能很快达到了瓶颈。于是,阿⾥开始关注Kafka。但是Kafka是针对⽇志收集场景设计的,他的⾼级功能并不是很贴合阿⾥的业务场景。尤其当他的 Topic过多时,由于Partition⽂件也会过多,这就会加⼤⽂件索引的耗时,会严重影响IO性能。于是阿⾥才决定⾃研中间件,最早叫做MetaQ,后来改名成为RocketMQ。最早他所希望解决的最⼤问题就是多Topic下的IO性能压⼒。但是产品在阿⾥内部的不断改进,RocketMQ开始体现出⼀些不⼀样的优势。
RocketMQ特点
RocketMQ安装下载(4.9.5版本)
RocketMQ的官⽹地址: RocketMQ · 官方网站 | RocketMQ 。在下载⻚⾯可以获取RocketMQ的源码包以及运⾏包。下载⻚⾯地址:下载 | RocketMQ
下载完成后解压后如图:
配置环境变量
配置完成后进入bin目录下,打开cmd通过以下命令启动nameserver和broker。
//启动nameserver
start mqnamesrv.cmd
//启动broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
启动后如下图所示即启动完成
RocketMQ启动可视化管理服务
在之前的下载界面,一直往下拉会看到如下
这是一个springboot项目,要求maven版本在3.2以上,可以选择直接启动,也可以选择将该项目打成一个jar包,然后用java -jar的方式启动,本人因为用jar包启动访问管理服务很慢,所以选择本地启动项目。访问localhost:8080即可访问。
RocketMQ实现基本消息收发
引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.5</version>
</dependency>
生产者代码
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 2; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.setKeys("testProducer"+i);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
消费者代码
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
msgs.forEach(messageExt -> {
try {
System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
System.out.println("消息Key:"+messageExt.getKeys());
} catch (UnsupportedEncodingException e) {}
});
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
启动后结果如下: