一、引言
1、、消息队列
Ⅰ、什么是消息队列?
消息队列是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数。也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。
Ⅱ、Message queue 释义
服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信) 消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端,称为延迟消息通讯(异步通信)
Ⅲ、消息队列特点
- 异步性:消息队列允许发送者发送消息后继续其它任务,而不需要等待接收者的响应。这种异步性能够提高系统的吞吐量和响应速度。
- 解耦性:消息队列将发送者和接收者解耦,使它们不需要知道彼此的存在。发送者只需将消息发送到队列中,而不需要关心谁将接收该消息。接收者通过订阅队列来获取消息,而不需要关心消息的发送者。这种解耦性能够提高系统的可扩展性和可维护性。
- 可靠性:消息队列通常具有持久化机制,可以确保消息的可靠性传输。即使在发送者或接收者出现故障的情况下,消息也不会丢失。此外,消息队列还具有消息重试和消息确认等机制,确保消息能够被正确处理。
- 消息类型和格式:消息队列中的消息是有类型的,并且具有格式。这使得消息可以被按类型读取,并遵循一定的格式。
- 多进程支持:消息队列允许一个或多个进程向它写入或者读取消息。
- 数据持久性:当从消息队列中读出消息后,消息队列中对应的数据都会被删除,确保数据不会重复消费。
- 分布式特性:通过对消费者的横向扩展,降低了消息队列阻塞的风险,以及单个消费者产生单点故障的可能性。
Ⅳ、好处
- 高吞吐量:由于消息的传输速度比普通的文件快,所以能够实现高吞吐量。
- 支持异步操作:一个线程在处理完自己的任务之后,可以把结果发送到另一个线程。
- 支持并发操作:多个线程可以同时处理消息队列中的消息。
- 支持分布式系统:由于消息队列的解耦特性,分布式系统中的组件可以独立扩展。
- 提供数据持久化机制:消息队列可以将数据进行持久化,确保数据不会因为处理过程中的失败而丢失。
- 提供错误恢复机制:当系统的一部分组件失效时,消息队列可以保证即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
- 提高响应速度:通过异步处理,消息队列可以显著提高系统的响应速度。
- 解耦:在项目启动之初来预测将来项目会碰到什么需求是极其困难的。使用消息队列可以在处理过程中间插入一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 扩展性:由于消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
- 灵活性:使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。
- 可恢复性:当体系的一部分组件失效时,不会影响到整个系统。
- 送达保证:消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。
- 排序保证:在许多情况下,数据处理的顺序都很重要。消息队列能保证数据会按照特定的顺序来处理。
- 缓冲:在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行——写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。
- 理解数据流:在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。
- 异步通信:很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。
2、消息队列相关
Ⅰ、AMQP
一个提供统一消息服务的应用层标准高级消息队列协议,是一个通用的应用层协议 消息发送与接受的双方遵守这个协议可以实现异步通讯.这个协议约定了消息的格式和工作方式.
技术选型
3、什么是RabbitMQ
RabbitMQ是一个开源的消息队列系统,使用Erlang语言开发,基于AMQP(高级消息队列协议)实现。它最初起源于金融系统,用于在分布式系统中存储和转发消息。RabbitMQ的主要特性包括易用性、扩展性、高可用性以及可靠性。消息队列(MQ)是一种应用程序对应用程序的通信方法,应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。在事件驱动(发布-订阅)架构中,RabbitMQ扮演着Broker的角色。
- Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程.
- Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue.
- Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue.
- ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic.
- Message Queue:消息队列,用于存储还未被消费者消费的消息.
- Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等.body是真正需要发送的数据内 容.
- BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来.
二、快速入门
1、Docker安装部署RabbitMQ
【注意】获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面
docker pull rabbitmq:management
--hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)
-e:指定环境变量:
RABBITMQ_DEFAULT_VHOST:默认虚拟机名
RABBITMQ_DEFAULT_USER:默认的用户名
RABBITMQ_DEFAULT_PASS:默认用户名的密码docker run -d \ --name 容器名\ -p 5672:5672 -p 15672:15672 \ -v /home/rabbitmq:/var/lib/rabbitmq \ --hostname my-rabbitmq-host \ -e RABBITMQ_DEFAULT_VHOST=my_vhost \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ --restart=always \ rabbitmq:management
容器启动后,查看容器日志
docker logs 容器名
开启防火墙
systemctl start firewalld
开放端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
更新防火墙规则
firewall-cmd --reload
Ⅰ、配置用户
进入管理后台
密码:admin
账号:admin
http://ip:15672
创建用户
点进用户进行分配
2、springboot连接配置
Ⅰ、新建项目
新建一个空项目,里面新建两个spring boot模块,并且导入依赖
接收者(publisher)消费者(consumer)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Ⅱ、配置yml
spring:
rabbitmq:
host: 0.0.0.0 #虚拟机开启的IP地址
username: spring #创建的用户
password: 123456 #用户的密码
port: 5672
virtual-host: my_vhost
Ⅲ、案例一
接收一个string类型
生产者
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @SuppressWarnings("all") public class RabbitConfig { @Bean public Queue firstQueue() { // 创建一个名为firstQueue的队列 return new Queue("firstQueue"); } }
使用Controller
@RestController @SuppressWarnings("all") public class SenderController { // 自动装配rabbitTemplate @Autowired private AmqpTemplate rabbitTemplate; // 发送消息到firstQueue队列 @RequestMapping("/sendFirst") public String sendFirst() { // 将消息转换并发送到firstQueue队列 rabbitTemplate.convertAndSend("firstQueue", "Hello World"); return "🐉"; } }
访问http://localhost:8888/sendFirst
可以看到消息队列中有了一个
消费者
@Component @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "firstQueue") public class Receiver { @RabbitHandler public void process(String msg) { log.warn("接收到:" + msg); } }
运行之后可以看到我们发送的消息
Ⅳ、案例二
我们接受一个实体类
生产者(publisher)
新建一个实体类User 实现接口Serializable
@SuppressWarnings("all") @Data @AllArgsConstructor @NoArgsConstructor public class User implements Serializable { private String username; private String userpwd; }
在RabbitConfig 类里面添加一个队列
@Bean public Queue secondQueue() { // 创建一个名为 secondQueue 的队列 return new Queue("secondQueue"); }
完整代码 RabbitConfig
@Configuration @SuppressWarnings("all") public class RabbitConfig { @Bean public Queue firstQueue() { // 创建一个名为firstQueue的队列 return new Queue("firstQueue"); } @Bean public Queue secondQueue() { // 创建一个名为 secondQueue 的队列 return new Queue("secondQueue"); } }
在Controller 层添加方法
// 自动装配ObjectMapper @Autowired private ObjectMapper objectMapper; // 发送消息到secondQueue队列 @RequestMapping("/send2") public String send2() throws JsonProcessingException { // 创建一个User对象 User wfzldr = new User("wfzldr", "1234567890"); // 将User对象转换为json字符串 String json = objectMapper.writeValueAsString(wfzldr); // 将消息转换并发送到firstQueue队列 // 将消息转换并发送到secondQueue队列 rabbitTemplate.convertAndSend("secondQueue", json); return "🐉"; }
完整代码SenderController
@RestController @SuppressWarnings("all") public class SenderController { // 自动装配rabbitTemplate @Autowired private AmqpTemplate rabbitTemplate; // 自动装配ObjectMapper @Autowired private ObjectMapper objectMapper; // 发送消息到firstQueue队列 @RequestMapping("/sendFirst") public String sendFirst() { // 将消息转换并发送到firstQueue队列 rabbitTemplate.convertAndSend("firstQueue", "Hello World"); return "🐉"; } // 发送消息到secondQueue队列 @RequestMapping("/send2") public String send2() throws JsonProcessingException { // 创建一个User对象 User wfzldr = new User("wfzldr", "1234567890"); // 将User对象转换为json字符串 String json = objectMapper.writeValueAsString(wfzldr); // 将消息转换并发送到firstQueue队列 // 将消息转换并发送到secondQueue队列 rabbitTemplate.convertAndSend("secondQueue", json); return "🐉"; } }
消费者(consumer)
新建一个接受实体的Receiver类
@Component @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "secondQueue") public class EntityReceiver { @Autowired private ObjectMapper objectMapper; @RabbitHandler public void process(String json) throws JsonProcessingException { User user = objectMapper.readValue(json, User.class); log.warn("接收到:" + user); } }
我破门也需要在消费者里添加实体
@SuppressWarnings("all") @Data @AllArgsConstructor @NoArgsConstructor public class User implements Serializable { private String username; private String userpwd; }
运行两个方法
3、创建公共模块
在项目里面新建一个publi的公共模块,在里面写公共的实体类
在公共模块的pom.xml文件里面把打包方式war改成 jar
在对应的消费者或者生产者里面引入对应的模块即可
<!-- 引入公共模块--> <dependency> <groupId>org.example</groupId> <artifactId>public</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
我的分享就到这里,欢迎大家在评论区留言!