MQ-rabbitMQ_基础篇
- 1.MQ
- 1.1什么是MQ
- 1,2应用
- 2.常见消息中间件协议(模型)
- 2.1JMS模型(协议)
- 2.2AMQP协议
- 3.RabbitMQ
- 3.1六种工作模式
- 3.1.1Hello Word简单模式
- 3.1.2word queues 工作队列
- 能者多劳
- 3.1.3Publish/Subscribe 发布与订阅模式
- direct-routing
- topic
- fanout
- 3.1.4Routing 路由
- 3.1.5 Topics 主题
- 3.1.6RPC 远程调用模式(远程调用,不太算消息队列)
- 消息应答
- 4.springboot开发
- 生产者
- 消费者
1.MQ
1.1什么是MQ
MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是
message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常
见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不
用依赖其他服务。
1,2应用
- 异步处理
- 流量控制
- 日志
- 服务解耦
2.常见消息中间件协议(模型)
详细的解释说明在常见消息中间件大 PK
两者的区别和联系:
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模型;而AMQP的消息模型更加丰富
2.1JMS模型(协议)
JMS 全称 Java Message Service,类似于 JDBC,不同于 JDBC,JMS 是 JavaEE 的消息服务接口,JMS 主要有两个版本:1.1 和2.0,两者相比,后者主要是简化了收发消息的代码。
考虑到消息中间件是一个非常常用的工具,所以 JavaEE 为此制定了专门的规范 JMS。
不过和 JDBC 一样,JMS 作为规范,他只是一套接口,并不包含具体的实现,如果我们要使用 JMS,那么一般还需要对应的实现,这就像使用 JDBC 需要对应的驱动一样。
JMS 消息服务支持两种消息模型:
- 点对点或队列模型
- 发布/订阅模型
常见支持jms的mq:
- Kafka
- Apache ActiveMQ
- OpenJMS
2.2AMQP协议
2006 年,AMQP 规范发布。
RabbitMQ 的版本为 3.5.7,基于 AMQP 0-9-1。
常见实现了amqp的mq:
- Apache Qpid
- Apache ActiveMQ
- RabbitMQ
其实 ActiveMQ 不仅支持 JMS,也支持 AMQP
在 AMQP 协议中,消息收发涉及到如下一些概念:
-
Broker 接收和分发消息的应用,RabbitMQ服务就是Message Broker。
-
Virtual host 虚拟机,出于多租户和安全因素设计的,把 AMQP的基本组件划分到一个虚拟的分组中,可以类比mysql数据库会创建很多库,库和库之间是独立的。当多个不同的用户使用同一个RabbitMQserver 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
-
Connection 消息的发布方或者消息的消费方 和broker 之间的 TCP 连接。
-
Channel Channel 是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以 channel之间是完全隔离的,减少了操作系统建立 TCP connection 的开销。
-
Exchange 交换机,message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。
-
Queue 队列,消息队列,接收消息、缓存消息,消息最终被送到这里等待 consumer 取走。
-
Binding 绑定,exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
另外还有大家熟知的阿里出品的 RocketMQ,这个是自定义了一套协议,社区也提供了 JMS,但是不太成熟
- ActiveMQ JMS和 AMQP
- RabbitMQ AMQP
- RocketMQ JMS 或者 自定义了一套协议
- Kafka JMS或者 自定义了一套协议
3.RabbitMQ
官方文档https://www.rabbitmq.com/getstarted.html
RabbitMQ就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 )
- Broker 接收和分发消息的应用,RabbitMQ服务就是Message Broker。
- Virtual host 虚拟机,出于多租户和安全因素设计的,把 AMQP的基本组件划分到一个虚拟的分组中,可以类比mysql数据库会创建很多库,库和库之间是独立的。当多个不同的用户使用同一个RabbitMQserver 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
- Connection 消息的发布方或者消息的消费方 和broker 之间的 TCP 连接。
- Channel Channel 是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以 channel之间是完全隔离的,减少了操作系统建立 TCP connection 的开销。
- Exchange 交换机,message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。
- Queue 队列,消息队列,接收消息、缓存消息,消息最终被送到这里等待 consumer 取走。
- Binding 绑定,exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
思考:在java中 把它们都封装成对象,然后去使用。
3.1六种工作模式
3.1.1Hello Word简单模式
简单模式:一个生产者生产消息发送到队列里面,一个消费者从队列里面拿消息,进行消费消息。一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
纯java案例
/*HelloWord模型 生产者代码*/
public class Producer {
public static final String QUEUE_NAME = "hello"; // 队列名称
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.226.129"); // 设置MQ所在机器IP进行连接
factory.setPort(5672); // 指定MQ服务端口
factory.setVirtualHost("study"); // 指定使用的VirtualHost
factory.setUsername("admin"); // 指定MQ账号名
factory.setPassword("123"); // 指定MQ密码
Connection connection = factory.newConnection(); // 创建连接
Channel channel = connection.createChannel(); // 创建信道
/* 队列设置(创建队列)
*参数1:队列名称,名称不存在就自动创建
*参数2:定义队列是否持久化(重启MQ后是队列否存在),true开启,false关闭
*参数3:exclusive 是否独占队列(设置是否只能有一个消费者使用),true独占,false非独占
*参数4:autoelete 是否在消费完成后是否自动删除队列 ,true删除,false不删除
*参数5:额外附加参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello RabbitMQ"; // 需要发送的消息
/* 交换机&队列设置(指定消息使用的交换机和队列)
* 参数1: exchange交换机名称(简单队列无交换机,这里不写)
* 参数2: 有交换机就是路由key。没有交换机就是队列名称,意为往该队列里存放消息
* 参数3: 传递消息的额外设置 (设置消息是否持久化) MessageProperties.PERSISTENT_TEXT_PLAIN设置消息持久化
* 参数4: 消息具体内容(要为 Byte类型)
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
/*关闭资源*/
channel.close();
connection.close();
System.out.println("消息生产完毕");
}
}
/*HelloWord模型 消费者案例*/
public class Consumer {
public static final String QUEUE_NAME = "hello"; // 队列名称
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.226.129"); // 设置MQ所在机器IP进行连接
factory.setPort(5672); // 指定MQ服务端口
factory.setVirtualHost("study"); // 指定使用的VirtualHost
factory.setUsername("admin"); // 指定MQ账号名
factory.setPassword("123"); // 指定MQ密码
Connection connection = factory.newConnection(); // 创建连接
Channel channel = connection.createChannel(); // 创建信道
/*消费者成功消费时的回调接口,这里为打印获取到的消息*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
/*消费者取消消费的回调*/
CancelCallback callback = consumerTag -> {
System.out.println("消息者取消消费接口回调逻辑");
};
/* 消费消息
* 参数1 : 消费队列的名称
* 参数2 : 消息的自动确认机制(一获得消息就通知 MQ 消息已被消费) true打开,false关闭 (接收到消息并消费后也不通知 MQ ,常用)
* 参数3 : 消费者成功消费时的回调接口
* 参数4 : 消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, callback);
System.out.println("消费者执行完毕");
}
}
3.1.2word queues 工作队列
它是轮训分发消息的
/*WorkQueue模型 生产者代码*/
public class WorkProvider {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection(); //创建连接
Channel channel = connection.createChannel(); //创建信道
/* 队列设置(创建队列)
*参数1:队列名称,名称不存在就自动创建
*参数2:定义队列是否持久化(重启MQ后是队列否存在),true开启,false关闭
*参数3:exclusive 是否独占队列(设置是否只能有一个消费者使用),true独占,false非独占
*参数4:autoelete 是否在消费完成后是否自动删除队列 ,true删除,false不删除
*参数5:额外附加参数
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Scanner scanner = new Scanner(System.in); //从控制台输入消息内容
while (scanner.hasNext()){
String message = scanner.next();
/* 交换机&队列设置(指定消息使用的交换机和队列)
* 参数1: exchange交换机名称(简单队列无交换机,这里不写)
* 参数2: 有交换机就是路由key。没有交换机就是队列名称,意为往该队列里存放消息
* 参数3: 传递消息的额外设置 (设置消息是否持久化) MessageProperties.PERSISTENT_TEXT_PLAIN设置消息持久化
* 参数4: 消息具体内容(要为 Byte类型)
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送完成:" + message);
}
}
}
/*WorkQueue模型 消费者代码*/
public class Consumer1 {
private static final String QUEUE_NAME = "hello"; //队列名称
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection(); //创建连接
Channel channel = connection.createChannel(); //创建信道
/*消费者成功消费时的回调接口,这里为打印获取到的消息*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息: "+ new String(message.getBody()) );
};
/*消费者取消消费的回调*/
CancelCallback callback = consumerTag -> {
System.out.println(consumerTag+"消息者取消消费接口回调逻辑");
};
System.out.println("消费者B等待接收消息......");
/* 消费消息
* 参数1 : 消费队列的名称
* 参数2 : 消息的自动确认机制(一获得消息就通知 MQ 消息已被消费) true打开,false关闭 (接收到消息并消费后也不通知 MQ ,常用)
* 参数3 : 消费者成功消费时的回调接口
* 参数4 : 消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,callback); //消费消息
}
}
/*WorkQueue模型 消费者代码*/
public class Consumer2 {
private static final String QUEUE_NAME = "hello"; //队列名称
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection(); //创建连接
Channel channel = connection.createChannel(); //创建信道
/*消费者成功消费时的回调接口,这里为打印获取到的消息*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息: "+ new String(message.getBody()) );
};
/*消费者取消消费的回调*/
CancelCallback callback = consumerTag -> {
System.out.println(consumerTag+"消息者取消消费接口回调逻辑");
};
System.out.println("消费者C等待接收消息......");
/* 消费消息
* 参数1 : 消费队列的名称
* 参数2 : 消息的自动确认机制(一获得消息就通知 MQ 消息已被消费) true打开,false关闭 (接收到消息并消费后也不通知 MQ ,常用)
* 参数3 : 消费者成功消费时的回调接口
* 参数4 : 消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,callback); //消费消息
}
}
能者多劳
Work模型的使用:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
3.1.3Publish/Subscribe 发布与订阅模式
(direct、topic、fanout 、 headers 【不常用】)
一个生产者生产消息发送到交换机里面,由交换机处理消息,队列与交换机的任意绑定,将消息指派给某个队列,一个或者多个消费者从队列里面拿消息,进行消费消息。需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
生产者发送给交换机和路由key就可以了,消费者声明队列和绑定队列
direct-routing
消费者:channel.queueBind(queue,EXCHANGE_NAME,NormalKey);
/*Direct模式 生产者代码*/
public class Provider {
private static final String EXCHANGE_NAME = "DirectExchange"; //交换机名称
private static final String VipKey = "Vip"; //普通VIP
private static final String NormalKey = "Normal"; //普通用户
private static final String SuperVipKey = "SuperVip"; //超级VIP
public static void main(String[] args) throws Exception {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明交换机
/*模拟不同会员等级接收到的不同消息内容*/
for (int i = 0; i < 9; i++) {
String message = "当前是待发送的消息,序号:"+i;
if(i%3==0){
channel.basicPublish(EXCHANGE_NAME,SuperVipKey,null,message.getBytes(StandardCharsets.UTF_8)); //发送超级VIP消息
}
if(i%3==1){
channel.basicPublish(EXCHANGE_NAME,VipKey,null,message.getBytes(StandardCharsets.UTF_8)); //发送VIP消息
}
if(i%3==2){
channel.basicPublish(EXCHANGE_NAME,NormalKey,null,message.getBytes(StandardCharsets.UTF_8)); //发送通用消息
}
System.out.println("消息发送: " + message);
}
}
}
/*Direct模式 消费者代码*/
public class VipConsumer {
private static final String EXCHANGE_NAME = "DirectExchange"; //交换机名称
private static final String NormalKey = "Normal"; //普通用户
private static final String VipKey = "Vip"; //普通VIP
public static void main(String[] args) throws Exception {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明交换机,由于生产者已经创建该交换机,如果生产者先执行,这行实际可以省略不写
String queue = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(queue,EXCHANGE_NAME,VipKey); //绑定队列和交换机
channel.queueBind(queue,EXCHANGE_NAME,NormalKey); //绑定队列和交换机
/*消费者成功消费回调逻辑*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Vip用户接收到的信息为:"+new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false); //手动消息应答
};
/*消费者取消消费回调逻辑*/
CancelCallback cancelCallback = a->{
System.out.println("Vip用户进行取消消费操作!");
};
channel.basicConsume(queue,false,deliverCallback,cancelCallback);
}
}
/*Direct模式 消费者代码*/
public class NormalConsumer {
private static final String EXCHANGE_NAME = "DirectExchange"; //交换机名称
private static final String NormalKey = "Normal"; //普通用户
public static void main(String[] args) throws Exception {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明交换机,由于生产者已经创建该交换机,如果生产者先执行,这行实际可以省略不写
String queue = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(queue,EXCHANGE_NAME,NormalKey); //绑定队列和交换机
/*消费者成功消费回调逻辑*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("普通用户接收到的信息为:"+new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false); //手动消息应答
};
/*消费者取消消费回调逻辑*/
CancelCallback cancelCallback = a->{
System.out.println("普通用户进行取消消费操作!");
};
channel.basicConsume(queue,false,deliverCallback,cancelCallback);
}
}
topic
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
fanout
- Fanout:广播,将消息交给所有绑定到交换机的队列
3.1.4Routing 路由
3.1.5 Topics 主题
3.1.6RPC 远程调用模式(远程调用,不太算消息队列)
消息应答
类型 代码 说明
自动应答
basicConsume中 autoAck 设置为 true
如果应答后的代码出现异常导致回滚,该消息由于已被消费无法找回
手动应答
Channel.basicAck
确认处理消息
手动应答
Channel.basicNack
否认处理消息(支持批量)
手动应答
Channel.basicReject
否认处理消息(不支持批量)如果队列配置了死信交换机将会发送到死信队列中,未配置则进行丢弃操作
/*消费者成功消费回调逻辑*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费者A对消息进行消费!");
try {
TimeUnit.SECONDS.sleep(1); //模拟实际业务操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者A接收到的信息为:"+new String(message.getBody()));
/*
* 参数一: 消息标记tag
* 参数二: 是否批量消费消息(true为应答该队列中所有的消息,false为只应答接收到的消息)
* */
channel.basicAck(message.getEnvelope().getDeliveryTag(),false); //手动消息应答
};
/*消费者成功消费回调逻辑*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费者A对消息进行消费!");
try {
TimeUnit.SECONDS.sleep(1); //模拟实际业务操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者A接收到的信息为:"+new String(message.getBody()));
/*
* 参数一: 消息标记tag
* 参数二: 是否批量消费消息(true为应答该队列中所有的消息,false为只应答接收到的消息)
* 参数三: 是否重回队列
* */
channel.basicNack(message.getEnvelope().getDeliveryTag(),false, false); //拒绝消息应答方法1
/*
* 参数一: 消息标记tag
* 参数二: 是否重回队列
* */
channel.basicReject(message.getEnvelope().getDeliveryTag(),false); //拒绝消息应答方法2
};
4.springboot开发
生产者
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.rabbitmq.host=192.168.226.132
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=springboot
另一种详细的:
/* RabbitMQ配置文件 */
@Configuration
public class RabbitConfig {
@Autowired
private RabbitProperties properties;
/*RabbitMQ连接池,从配置文件读取参数*/
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(properties.getHost());
cachingConnectionFactory.setPort(properties.getPort());
cachingConnectionFactory.setUsername(properties.getUsername());
cachingConnectionFactory.setPassword(properties.getPassword());
cachingConnectionFactory.setVirtualHost(properties.getVirtualHost());
return cachingConnectionFactory;
}
/* RabbitTemplate配置 */
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); //让RabbitTemplate使用连接池
return rabbitTemplate;
}
/*创建交换机*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange("TemplateDirectEx",false,false);
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("TemplateFanoutEx",false,false);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("TemplateTopicEx",false,false);
}
/*创建队列*/
@Bean
public Queue directQueue1(){
return new Queue("directQueue1",true);
}
@Bean
public Queue directQueue2(){
return new Queue("directQueue2",true);
}
@Bean
public Queue topicQueue1(){
return QueueBuilder.durable("topicQueue1").build();
}
@Bean
public Queue topicQueue2(){
return QueueBuilder.durable("topicQueue2").build();
}
/*创建绑定关系方法一*/
@Bean
public Binding directBind1(){
return new Binding("directQueue1", Binding.DestinationType.QUEUE,
"TemplateDirectEx","WeiXin",null);
}
@Bean
public Binding directBind2(){
return BindingBuilder.bind(new Queue("directQueue2",false))
.to(new DirectExchange("TemplateDirectEx"))
.with("WeiXin");
}
/*创建绑定关系方法二
* 将Bean方法名称作为参数代入*/
@Bean
public Binding topicBind1(@Qualifier("topicQueue1") Queue queue,
@Qualifier("topicExchange") TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("user.#");
}
@Bean
public Binding topicBind2(@Qualifier("topicQueue2") Queue queue,
@Qualifier("topicExchange") TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("vip.*");
}
/* 消息容器SimpleMessageListenerContainer 配置*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(CachingConnectionFactory cachingConnectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory); //设置连接池
container.setQueues(topicQueue1(),topicQueue2(),directQueue1(),directQueue2()); //设置队列
container.setConcurrentConsumers(1); //消费者数量
container.setMaxConcurrentConsumers(10); //最大消费者
container.setDefaultRequeueRejected(false); //是否设置重回队列,一般都为false,相当于 channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //消息应答方式,自动/手动/拒绝
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
}); //消费端的标签策略,每个消费端都有独立的标签,可在控制台的 channel > consumer 中查看 对应tag
/* 消息监听器方法一 实际用消息适配器
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("消费者的消息"+new String(message.getBody() ));
}
}); */
/*消息监听器方法二 使用消息适配器 方案一,通用适配模式
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumerMessage"); //自定义消息处理方法名称
adapter.setMessageConverter(new MyMessageConverter()); //添加消息转换器
container.setMessageListener(adapter);
*/
/*消息监听器方法二 使用消息适配器 方案二,指定不同的队列使用不同的监听方法
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new MyMessageConverter()); //添加消息转换器
adapter.setDefaultListenerMethod("consumerMessage"); //消息适配器默认监听方法名称
Map<String,String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("directQueue1","method1");
queueOrTagToMethodName.put("directQueue2","method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); //队列标识与方法名称组成的集合
container.setMessageListener(adapter);
*/
/*使用默认的JSON格式转换器,消息需要使用Map进行接收
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumerMessage"); //消息适配器默认监听方法名称
Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jsonMessageConverter);
container.setMessageListener(adapter);
*/
/*使用默认的JSON格式转换器,消息转换为具体的Java对象,需要使用对象进行接收
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumerMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
javaTypeMapper.addTrustedPackages("*"); //允许使用所有包进行转换,默认会使用 java核心类进行转换
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
*/
/*使用默认的JSON格式转换器,消息转换为具体的Java对象,需要使用对象进行接收,支持多映射
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumerMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
Map<String,Class<?>> idClassMap = new HashMap<>(); //创建Map进行多映射指定,KEY为名称,value为类全路径
idClassMap.put("student",com.dmbjz.entity.Student.class);
idClassMap.put("packaged",com.dmbjz.entity.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMap);
javaTypeMapper.addTrustedPackages("*"); //允许使用所有包进行转换,默认会使用 java核心类进行转换
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
*/
/*多类型消息转换器,不同消息类型使用不同类型转换器进行转换*/
MessageListenerAdapter adapter =new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("extComsumeMessage");
ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter(); //复杂消息转换器
TextMessageConverter textConvert = new TextMessageConverter(); //文本转换器
converter.addDelegate("text",textConvert);
converter.addDelegate("html/text",textConvert);
converter.addDelegate("xml/text",textConvert);
converter.addDelegate("text/plain",textConvert);
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter(); //JSON转换器
converter.addDelegate("json",jsonConverter);
converter.addDelegate("application/json",jsonConverter);
ImageMessageConverter imageConverter = new ImageMessageConverter(); //图片转换器
converter.addDelegate("image/png",imageConverter);
converter.addDelegate("image",imageConverter);
PDFMessageConverter pdfConverter = new PDFMessageConverter(); //PDF转换器
converter.addDelegate("application/pdf",pdfConverter);
adapter.setMessageConverter(converter);
container.setMessageListener(adapter);
return container;
}
}
@SpringBootTest
class RabbitMqBootRabbitTemplateApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/*RabbitTemplate的API案例*/
@Test
public void testTemplate(){
/*创建消息,可以指定消息具体参数*/
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc","请求头desc参数信息描述");
messageProperties.getHeaders().put("type","请求头type参数信息描述");
messageProperties.setContentType("application/json"); //发送格式
messageProperties.setContentEncoding("UTF-8"); //UTF-8格式化
/*封装消息
* 参数一:消息内容
* 参数二:消息配置
*/
Message message = new Message("这是RabbitTemplate消息".getBytes(StandardCharsets.UTF_8),messageProperties);
/* MessagePostProcessor:发送消息前的消息拦截器
* 可以对消息参数进行修改,例如设置优先级、请求头等
*/
rabbitTemplate.convertAndSend("TemplateDirectEx", "WeiXin", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("拦截需要发送的消息并进行二次设置");
message.getMessageProperties().getHeaders().put("desc","请求头desc参数信息修改");
message.getMessageProperties().getHeaders().put("attr","请求头额外新加attr参数");
return message;
}
});
/*创建消息,使用链式调用*/
Message message2 = MessageBuilder.withBody("这是Template消息2".getBytes(StandardCharsets.UTF_8))
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setMessageId("消息ID:"+ UUID.randomUUID())
.setContentEncoding("UTF-8")
.setHeader("desc","额外修改的信息描述")
.setHeader("info","请求头参数2")
.build();
rabbitTemplate.convertAndSend("TemplateTopicEx", "user.student", message2);
/*最简单的调用方式*/
//rabbitTemplate.convertAndSend("TemplateTopicEx", "vip.student", "我是最简单的消息!");
rabbitTemplate.send("TemplateTopicEx", "user.teacher.aa", message2);
}
/*发送消息使用JSON格式转换器*/
@Test
public void testSendJsonMessage() throws JsonProcessingException {
Student student = new Student().setId("001").setName("小明").setContent("一年级一班学生");
/*使用ObjectMapper将消息转换为JSON数据,换成FastJSON也可以*/
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(student);
System.out.println("需要发送的消息内容:" + json);
MessageProperties properties = new MessageProperties();
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message(json.getBytes(StandardCharsets.UTF_8),properties);
rabbitTemplate.convertAndSend("TemplateTopicEx","vip.man",message);
}
/*发送消息使用JSON格式转换器 转换为 Java实体类*/
@Test
public void testSendJavaMessage() throws JsonProcessingException {
Student student = new Student().setId("001").setName("小明").setContent("一年级一班学生");
/*使用ObjectMapper将消息转换为JSON数据,换成FastJSON也可以*/
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(student);
System.out.println("需要发送的消息内容:" + json);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.getHeaders().put("__TypeId__","com.dmbjz.entity.Student"); //key为固定值,value为需要转换对象的全路径
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("TemplateTopicEx","vip.man",message);
}
/*发送消息使用JSON格式转换器 转换为 Java实体类 多映射*/
@Test
public void testSendJavaMessage2() throws JsonProcessingException {
/*实体类一发送消息*/
Student student = new Student().setId("001").setName("小明").setContent("一年级一班学生");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(student);
System.out.println("需要发送的Student消息内容:" + json);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.getHeaders().put("__TypeId__","student"); //key为固定值,value为多映射配置的实体类名称
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("TemplateTopicEx","vip.man",message);
/*实体类二发送消息*/
Packaged packaged = new Packaged().setIds("002").setPname("dmbjz").setPdesc("打包内容");
String json2 = mapper.writeValueAsString(packaged);
System.out.println("需要发送的Package消息内容:" + json2);
messageProperties.getHeaders().put("__TypeId__","packaged"); //key为固定值,value为 idClassMap 的 Key
Message message2 = new Message(json2.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("TemplateTopicEx","user.man",message2);
}
/*发送消息使用多类型消息转换器*/
@Test
public void testSendExtConverterMessage() throws Exception {
/*发送图片文件*/
byte[] body = Files.readAllBytes(Paths.get("E:/图片/头像", "ludashi.png"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("image/png");
messageProperties.getHeaders().put("extName", "png");
Message message = new Message(body, messageProperties);
rabbitTemplate.convertAndSend("TemplateDirectEx", "WeiXin", message);
/*发送PDF文件*/
byte[] body2 = Files.readAllBytes(Paths.get("E:/", "rabbitmq.pdf"));
MessageProperties messageProperties2 = new MessageProperties();
messageProperties2.setContentType("application/pdf");
Message message2 = new Message(body2, messageProperties2);
rabbitTemplate.convertAndSend("TemplateDirectEx", "WeiXin", message2);
}
`
}
消费者
另一种详细的:
@Configuration
public class RabbitMQConfig {
@Autowired
private RabbitProperties properties;
public static final String EXCHANGE_NAME = "CustomListener-Exchange";
public static final String QUEUE_NAME = "CustomListener-Queue";
public static final String Routing_Key = "CustomListener-Key";
/*RabbitMQ连接池,从配置文件读取参数*/
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(properties.getHost());
cachingConnectionFactory.setPort(properties.getPort());
cachingConnectionFactory.setUsername(properties.getUsername());
cachingConnectionFactory.setPassword(properties.getPassword());
cachingConnectionFactory.setVirtualHost(properties.getVirtualHost());
cachingConnectionFactory.setPublisherReturns(properties.isPublisherReturns()); //开启连接池的ReturnCallBack支持
return cachingConnectionFactory;
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(EXCHANGE_NAME,false,false,null);
}
@Bean
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Binding binding(@Qualifier("queue") Queue queue,
@Qualifier("directExchange") DirectExchange directExchange
){
return BindingBuilder.bind(queue).to(directExchange).with(Routing_Key);
}
/* 设置自定义监听方法 */
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,
OrderMessageService orderMessageService){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
container.setExposeListenerChannel(true); //是否将监听器交给 ChannelAwareMessageListener,低版本SpringBoot需要手动开启
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(orderMessageService); //对队列使用自定义监听器方法进行处理
return container;
}
}
/* 自定义消息监听,可以编写公共消息应答方法 */
@Slf4j
public abstract class AbstractMessageListener implements ChannelAwareMessageListener {
public abstract void receviceMessage(Message message);
/* 获取到的队列消息执行抽象方法,抽象方法实际落地就是 OrderMessageService 的 receviceMessage */
@Override
public void onMessage(Message message, Channel channel) throws IOException, InterruptedException {
MessageProperties messageProperties = message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
log.info("收到消息{}: ", message);
try{
receviceMessage(message);
channel.basicAck(deliveryTag , false); //同意应答
}catch (Exception e){
log.error(e.getMessage(), e);
/* 这里可以根据消息具体的参数判断是否应该拒绝重回队列 */
if (message.getBody().length>100){
channel.basicReject(deliveryTag, false);
} else {
channel.basicNack(deliveryTag, false, true);
}
}
}
}
/* 自定义消息监听器的消息处理方法,一般为具体的业务处理逻辑 */
@Slf4j
@Service
public class OrderMessageService extends AbstractMessageListener {
@Override
public void receviceMessage(Message message) {
log.info("OrderMessageService获取到消息:{}", new String(message.getBody()));
}
}