序言
首先我要抛出几个问题让大家思考一下:为什么你的项目中要用MQ呢?使用MQ为你解决了什么问题?当然解决问题的同时它又有哪些弊端值得注意?
如果你不太清楚或者你根本没有考虑过,那么请往下看你会找到你想要的答案。
1.介绍
官网介绍:RabbitMQ is the most widely deployed open source message broker.(RabbitMQ是部署最广泛的开源信息代理。)
RabbitMQ采用Erlang语言开发,是AMQP(Advanced Message Queuing Protocol)的标准实现。支持持久化,支持多种客户端,如 Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等。常用于分布式系统中的存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
2.特点
- 分布式部署,支持集群模式、跨区域部署,以满足高可用、高吞吐量应用场景;
- 异步消息传递,支持多种消息传递协议、消息队列、传递确认机制,灵活的路由消息到队列,多种交换类型;
- 良好的开发者体验,可在许多操作系统及云环境中运行,并为大多数流行语言提供各种开发工具;
- 可插拔身份认证授权,支持 TLS(Transport Layer Security)和 LDAP(Lightweight Directory Access Protocol),轻量且容易部署到内部、私有云或公有云中;
- 有专门用于管理和监督的HTTP-API、命令行工具和 UI;
- 支持连续集成,可以插件方式灵活地扩展 RabbitMQ 的功能。
优点:
- 由于 Erlang 语言的特性,RabbitMQ 性能较好、高并发;
- 有消息确认机制和持久化机制,可靠性高;
- 高度灵活可定制的路由;
- 管理界面较丰富,在互联网公司也有较大规模的应用;
- 健壮、稳定、易用、跨平台、支持多种语言客户端、文档齐全;
- 社区活跃度高,更新快。
缺点:
- 尽管结合 Erlang 语言本身的并发优势,性能较好,但是不利于做二次开发和维护;
- 实现了代理架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得 RabbitMQ 易于使用和部署,但使得其运行速度较慢,因为中央节点增加了延迟,消息封装后也比较大;
- 需要学习比较复杂的接口和协议,学习和维护成本较高。
3.安装
安装官网:RabbitMQ
RabbitMQ支持window、unix、macos等平台安装,按自己的需要进行安装
管理端页面:http://localhost:15672/
账号密码:guest
4.集成
因为springboot已经有mq相关starter,所以我们之间引用依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml文件配置信息:
spring:
#rabbitmq配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
配置文件:
package com.iterge.iterge_pre.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author liuph
* @date 2023/10/17 15:50:37
*/
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
public static final String ROUTINGKEY_SMS="inform.#.sms.#";
//声明交换机
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//声明QUEUE_INFORM_EMAIL队列
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL(){
return new Queue(QUEUE_INFORM_EMAIL);
}
//声明QUEUE_INFORM_SMS队列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS(){
return new Queue(QUEUE_INFORM_SMS);
}
//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
//ROUTINGKEY_SMS队列绑定交换机,指定routingKey
@Bean
public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
生产者配置:
package com.iterge.iterge_pre.mq;
import com.iterge.iterge_pre.config.RabbitmqConfig;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author liuph
* @date 2023/10/17 15:52:48
*/
@Component
public class ProducerService {
@Autowired
private RabbitMessagingTemplate mqTemplate;
public void sendMag(String msg){
mqTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.email", msg);
}
}
消费者配置:
package com.iterge.iterge_pre.mq;
import com.iterge.iterge_pre.config.RabbitmqConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author liuph
* @date 2023/10/17 15:53:04
*/
@Component
public class ConsumerService {
//监听email队列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
public void receive_email(Message message, Channel channel){
String body = new String(message.getBody());
System.out.println("消费者:QUEUE_INFORM_EMAIL msg_"+body);
}
//监听sms队列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
public void receive_sms(Message message, Channel channel){
String body = new String(message.getBody());
System.out.println("消费者:QUEUE_INFORM_SMS msg_"+body);
}
}
创建Controller
package com.iterge.iterge_pre.controller;
import com.iterge.iterge_pre.entity.Response;
import com.iterge.iterge_pre.mq.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author liuph
* @date 2023/10/17 15:58:32
*/
@RestController
@RequestMapping("mq")
public class MQController {
@Autowired
private ProducerService producerService;
@GetMapping("send/{msg}")
public Response<String> send(@PathVariable(value = "msg") String mag){
producerService.sendMag(mag);
return Response.ok();
}
}
测试:
控制台信息:
如上,消费者成功消费到hi_rabbitmq
5.思考
所以文章读完了,开篇的问题思考的有结果了吗?
没有结果也没关系,下面我们做下总结:
1. 为什么你的项目中要用MQ呢?
这个问题要结合自己的业务场景来判断,比如多服务调用时需要应用解偶,高并发场景下提高需要保证服务的高可用,高性能等。
2.使用MQ为你解决了什么问题?
那MQ为我们解决什么问题呢,一般可以用6个字来概括:解偶、异步、削峰
解偶场景:比如当B、C系统需要用到A系统的数据时,A系统要分别调用B、C两端的接口就行数据传输,如果这是在来个D系统也需要A的数据,那么A系统还要改造代码,这时我们就可以把方案调整为通过MQ就行数据传输,A生产数据,别的系统需要数据之间消费就行了
异步场景:比如A系统需要调用B系统的接口进行业务操作,而B接口的业务逻辑又要用到C系统的接口,假如a自身业务处理耗时1s,a调b耗时2s,b调c耗时2秒,那整个流程耗时5s;
如果把方案调整为,a把数据放到MQ中,b系统消费后进行业务处理,假如a自身业务处理耗时1s,MQ耗时1s,共耗时2s,大大提高了业务处理能力
削峰场景:假如现在有1000w个用户请求a系统,所有请求的数据要存到数据库中,如果这些请求数据一股脑儿的都往数据库怼,那可能导致的结果是数据库顶不住,连接出现异常或者宕机,这种情况下改用MQ的方式,当数据进来时先把数据存的mq,b系统再慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
3.当然解决问题的同时它又有哪些弊端值得注意?
而在解决上面的问题的同时,MQ也有一些弊端需要我们注意,由于链路变长,就有存在一些数据一致性的问题,比如数据丢失、重复消费、顺序消费等问题就出现。
本文章针对这些弊端问题暂时先不做讲述。