目录
1. MQ是什么,有哪些作用?
2. 主要的MQ框架有哪些?
3. RabbitMQ安装
4. RabbitMQ中的主要概念
5. 消息队列的核心概念
6. 一个简单的生产者和消费者示例。
6.1 消息发送者模块
1. MQ是什么,有哪些作用?
消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。是程序与程序之间传递信息的一种方式。
作用:
- 异步: 一般适用于耗时但非核心业务的处理,即:程序无需等待非核心业务处理完成,即可进行下一步操作,有点像电路中的并行电路
- 解耦:通过引入消息总线,将模块之间的直接的调用,转化为消息的传递。
- 消峰:对于来不及及时处理的消息,可以暂存于消息队列里排队。
缺点:
- 可用性降低:系统加了对消费队列服务的依赖,依赖越多的系统越脆弱。
- 复杂度提高:加入消息队列,如果消息有重复如何处理? 消息丢失如何处理?
- 一致性问题:方法的调用者不需等待执行者的返回,如果执行者失败,则容易造成状态不一致。
2. 主要的MQ框架有哪些?
MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。
RabbitMQ是以AMQP协议实现的一种消息中间件产品。AMQP是Advanced Message Queuing Protocol的简称,是一个面向消息中间件的开放式标准应用层协议。
常用消息队列对照表:
3. RabbitMQ安装
1)下载rabbitmq镜像
docker pull rabbitmq:management
使用management标签是下载包含管理控制台的版本,在下载镜像时没有指定版本号,则会到docker仓库中下载最新的镜像版本。
2)运行镜像
##方式一:默认guest用户,密码也是guest
$ docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
方式二
$ docker run -d \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
-v /data:/var/lib/rabbitmq \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management
参数说明:
-d:后台运行容器
-name:指定容器名
-p:指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号)
-v:映射目录或文件,启动了一个数据卷容器,数据卷路径为:/var/lib/rabbitmq,再将此数据卷映射到住宿主机的/data目录
--hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)
-e:指定环境变量;(
RABBITMQ_DEFAULT_VHOST:默认虚拟机名;
RABBITMQ_DEFAULT_USER:默认的用户名;
RABBITMQ_DEFAULT_PASS:默认用户名的密码)
--restart=always:当Docker重启时,容器能自动启动
rabbitmq:management:镜像名
注1:RABBITMQ_DEFAULT_VHOST=my_vhost,my_vhost名字请记好,在之后的编程中要用到,
如果启动时没指定,默认值为/
#4.进入RabbitMQ管理平台进行相关操作
注1:容器启动后,可以通过docker logs 窗口ID/容器名字 查看日志
docker logs my-rabbitmq
注2:停止并删除所有容器
docker stop $(docker ps -aq) && docker rm $(docker ps -aq)
docker ps
4)开放端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent # 开放15672端口
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload # 配置立即生效
5.运行测试
http://192.168.164.128:15672/#/
4. RabbitMQ中的主要概念
RabbitMQ是实现了高级消息服务队列协议(AMQP)的开源消息代理软件,也称为面向消息的中间件。使用erlang语言开发。AMQP是Advanced Message Queuing Protocol的简称,是面向消息中间件的开放式标准应用协议,这样意味着RabbitMQ可以更容易的构建异构的系统。
1)Server(Broker) 服务器(或叫经纪人)
可以理解为消息队列服务器的主体,接收客户端连接,实现AMQP协议的消息队列和路由功能的进程
2) Virtual Host 虚拟机
虚拟主机的概念,它是对Broker的虚拟划分,将消息生产者,消费者,和它们依赖的AMQP相关的结构进行隔离,一般是从安全方面的考虑。类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue。类似于一个数据库实例中可以创建多个数据库的概念。每个Virtual Host都是个独立的。
3)Exchange 交换机
交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue。交换机分为:fanout、direct、topic三种类型
4) Queue 队列
消息队列,用于存储还未被消费者消费的消息,用于存放业务数据
5)Message 消息
由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等;body是真正需要发送的数据内容;
在业务系统中代码真正的业务数据
6)Binding Key 绑定关键字
绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来
7)Routing key:路由关键字,Exchange根据这个关键字进行消息投递
8)connection:连接,代理生产者,消费者、Broker之间通信的物理网络。
9)Channel:消息通道,用于连接生产者和消费者的逻辑结构。在客户端的每个连接里可以建立多个Channel,每个Channel代表一个会话,通过Channel可以隔离同一个连接中的不同交互内容。
10)Producer: 消息的生产者,制造消息并发送消息的程序
11)Consumer:消息消费者,接收并处理消息的程序。
5. 消息队列的核心概念
消息队列中的核心概念包括以下几个:生产者、队列、消费者、消息 。
生产者生产消息并投递到队列中,
消费者可以从队列中获取消息并消费,
消息指的是各个服务之间要传递的数据。
6. 一个简单的生产者和消费者示例。
本示例包含一个消息生产者,队列(RabbitMQ),消息消费者。消息生产者生产消息,并将消费发送到队列服务器,消息消费者监听并消费队列中的消息。
备注:本示例通过在原有的springcloud示例项目中加入新模块来演示RabbitMQ的基本使用。
6.1 消息发送者模块
1)引入必要的依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2)项目配置文件
server.port=8080
## rabbitmq config
spring.rabbitmq.host=192.168.199.144
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=my_vhost
3)启动类
/**
* @author Administrator
* @create 2020-02-1518:53
*/
@SpringBootApplication
public class SenderApp {
public static void main(String[] args) {
SpringApplication.run(SenderApp.class, args);
}
}
4) RabbitMQ配置类
/**
* @author Administrator
* @create 2020-02-1519:04
*/
@Configuration
@Slf4j
public class RabbitMQConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
}
5)编写服务,用于向队列中存放 消息
package com.zking.rabbit.rabbitmqdemo.controller;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@RestController
public class ProviderController {
@Autowired
private AmqpTemplate amqpTemplate;
@RequestMapping("/send")
public String send(){
amqpTemplate.convertAndSend("hello","这是 生产向队列hello投递消息 时间:"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
return "success";
}
}
运行项目测试
刷星一次页面 数量随着而变化
6.编写消费者,消费消息
package com.zking.rabbit.rabbitmqdemo.component;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = {"hello"})
public class Consumer {
@RabbitHandler
public void handler(String msg){
System.out.println("消费者接收 -- "+ msg);
}
}
重启查看效果