一、消息队列(Rabbit Message Queue)
1、概念
消息队列是一种应用之间的通信方式,消息发送之后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只发布消息到MQ,消息使用者值从MQ中拿消息,两者不知道对方的存在。
简单来说:MQ是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
2、MQ的特性
应用解耦,异步提速,削峰填谷。
系统的耦合度越高,容错性就越低,难维护。使用 MQ 使得应用间解耦,提升容错性和可维护性
二、MQ的安装
1、安装依赖环境
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
2、安装Erlang(https://www.erlang.org/downloads)
下载erlang的tar包,上传至虚拟机,进行解压
tar -zxvf otp_src_19.3.tar.gz
3、安装RabbitMQ(https://www.rabbitmq.com/download.html)
rpm -ivh --nodeps rabbitmq-server-3.7.2-1.el7.noarch.rpm
4、开启管理界面及配置
开启管理界面
rabbitmq-plugins enable rabbitmq_management4
修改默认配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
5、RabbitMQ的启动和关闭
启动服务:service rabbitmq-server start
停止服务:service rabbitmq-server stop
重启服务 :service rabbitmq-server restart
cd /usr/share/doc/rabbitmq-server-3.6.5/
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
6、用户角色
RabbitMQ 安装成功后使用默认用户名 guest 登录账号: guest 密码:guest
7、用户角色
添加用户: rabbitmqctl add_user {username} {password}rabbitmqctl add_user root root删除用户: rabbitmqctl delete_user {username}修改密码: rabbitmqctl change_password {username} {newpassword}rabbitmqctl change_password root 123456设置用户角色: rabbitmqctl set_user_tags {username} {tag}rabbitmqctl set_user_tags root administrator
超级管理员(administrator):
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)
普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
三、RabbitMQ消息发送与接收
1、消息的模式
P:生产者——>创建消息,然后发布到队列 、C:消费者 ——>获得消息
红框框queue:消息队列——>可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
2、高级消息队列协议(AMQP)
它是一个网络协议,是应用层协议 的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端或中间件不同产品,不同的开发语言等条件的限制。
Exchange :交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Binding :绑定,用于消息队列和交换器之间的关联。exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到exchange 中的查询表中,用于message 的分发依据。
Queue 消息队列,用来保存消息直到发送给消费者,消息最终被送到这里等待 consumer 取走
Connection :网络连接,比如一个 TCP 连接。
Channel: 信道,多路复用连接中的一条独立的双向数据流通道,Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯、AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。
Virtual Host: 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,连接时必须指定RabbitMQ 默认的 vhost 是 / 。
Broker :接收和分发消息的应用
四、RabbitMQ工作模式
1. 简单模式
2. 工作队列模式 Work Queue
3. 发布订阅模式 Publish/subscribe
4. 路由模式 Routing
5. 通配符模式 Topic
需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
五、SpringBoot整合RabbitMQ
1、导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、编写配置文件
spring:
rabbitmq:
host: 192.168.88.99 #主机ip
port: 5672 #端口
username: guest
password: guest
virtual-host: /
3、定义监听类,使用@RabbitListener注解完成队列监听。
消费者consumer
@Component
public class RabbimtMQListener {
@RabbitListener(queues = "bootQueues")
public void ListenerQueue(Message message){
//System.out.println(message);
System.out.println(new String(message.getBody()));
}
}
生产者
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "bootTopicExchange";
public static final String QUEUE_NAME = "bootQueues";
//1.交换机
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2.Queue 队列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3. 队列和交互机绑定关系 Binding
/*
1. 知道哪个队列
2. 知道哪个交换机
3. routing key
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
测试类
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
//1.注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello~~~");
}
}