文章目录
- 消息队列
- 概述
- 两大种类
- RabbitMQ
- 安装及基操
- Docker中安装
- 添加用户
- 创建Virtual Hosts
- 设置权限
- 添加交换机
- 创建队列
- 交换机绑定队列
- 五种消息模型
- SpringBoot整合MQ
- 引入依赖
- properties配置
- 开启RabbitMQ
- API使用
- 创建交换机
- 创建队列
- 交换机绑定队列
- 发送消息
- 接收消息
- 消息确认机制
- 延时队列
消息队列
概述
两大种类
RabbitMQ
安装及基操
Docker中安装
下载镜像:docker pull rabbitmq:management
创建实例并启动:
docker run -d --name rabbitmq --publish 5671:5671 \
--publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \
rabbitmq:management
- 4369 – erlang发现口
- 5672 --client端通信口
- 15672 – 管理界面ui端口
- 25672 – server间内部通信口
在web浏览器中输入地址:http://服务器ip:15672/
输入默认账号: guest : guest
overview:概览
connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
Exchanges:交换机,用来实现消息的路由
Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
端口:
5672: rabbitMq的编程语言客户端连接端口
15672:rabbitMq管理界面端口
25672:rabbitMq集群的端口
添加用户
如果不使用guest,我们也可以自己创建一个用户:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
创建Virtual Hosts
虚拟主机:类似于mysql中的database。他们都是以“/”开头
设置权限
添加交换机
创建队列
交换机绑定队列
绑定成功
五种消息模型
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。
但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。
- 点对点
- 多人监听
- 主题模式
- 字符串精确匹配传输消息的广播
- 字符串支持通配符匹配传输消息的广播
SpringBoot整合MQ
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
properties配置
# ip
spring.rabbitmq.host=192.168.11.130
# 端口
spring.rabbitmq.port=5672
# virtualHost
spring.rabbitmq.virtual-host=/
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
开启RabbitMQ
// 开启RabbitMQ支持
@EnableRabbit
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
API使用
创建交换机
创建队列
交换机绑定队列
发送消息
用JSON发送含有对象的消息,需要自定义配置类
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
接收消息
消息确认机制
RabbitMQ配置类
@Configuration
public class MyRabbitConfig {
private RabbitTemplate rabbitTemplate;
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
*
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
*
*/
// @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate() {
/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}
延时队列
相当于定时任务
@Configuration
@Slf4j
public class MyMqConfig {
/* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */
/* RabbitMq里面如果有,即使属性发生变化,也不会覆盖
/*
* @Description 延时消息队列-死信队列
* @Author WSKH
*/
@Bean
public Queue orderDelayQueue() {
/*
Queue(String name, 队列名字
boolean durable, 是否持久化
boolean exclusive, 是否排他
boolean autoDelete, 是否自动删除
Map<String, Object> arguments) 属性
*/
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange");
arguments.put("x-dead-letter-routing-key", "order.release.order");
// 消息过期时间 1分钟
arguments.put("x-message-ttl", 60000);
return new Queue(MqConstants.ORDER_DELAY_QUEUE, true, false, false, arguments);
}
/**
* @Description 普通队列-延时队列死亡后,消息经过exchange送给普通队列,接收者负责删除订单
* @Author WSKH
*/
@Bean
public Queue orderReleaseQueue() {
return new Queue(MqConstants.ORDER_RELEASE_ORDER_QUEUE, true, false, false);
}
/**
* @Description 创建order交换机
* @Author WSKH
*/
@Bean
public Exchange orderEventExchange() {
/*
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
* */
return new TopicExchange(MqConstants.ORDER_EVENT_EXCHANGE, true, false);
}
/**
* @Description 延时队列和order交换机的绑定(创建订单)
* @Author WSKH
*/
@Bean
public Binding orderCreateBinding() {
/*
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* */
return new Binding(MqConstants.ORDER_DELAY_QUEUE,
Binding.DestinationType.QUEUE,
MqConstants.ORDER_EVENT_EXCHANGE,
"order.create.order",
null);
}
/**
* @Description 普通队列和order交换机的绑定(负责释放到时间还没有支付的订单)
* @Author WSKH
*/
@Bean
public Binding orderReleaseBinding() {
return new Binding(MqConstants.ORDER_RELEASE_ORDER_QUEUE,
Binding.DestinationType.QUEUE,
MqConstants.ORDER_EVENT_EXCHANGE,
"order.release.order",
null);
}
}