第12章 消息服务
- 12.1 JMS_ActiveMQ
- 1. 简介
- 2. ActiveMQ安装
- Linux安装命令
- 问题1:网页访问不了
- 问题2: 修改密码
- 3. 整合SpringBoot
- 3.1 依赖
- 3.2 配置
- 3.3 JmsComponent 组件
- 3.4 测试
- 12.2 AMQP_RabbitMQ
- 1. 简介
- 2. RabbitMQ
- 2.1 Erlang环境安装(略)
- 2.2 安装RabbitMQ(❤❤❤❤)
- 2.3 启动RabbitMQ
- 3. 整合SpringBoot
- 3.1 依赖
- 3.2 配置文件
- 3.3 交换机Exchange配置
- 3.4 Direct:绑定策略 (默认) ❤❤❤❤
- 定义交换机、消息队列
- 消费者
- 生产者
- 3.5 Fanout: 订阅模式 ❤❤❤❤
- 创建Fanout交换机、多个消息队列
- 消费者
- 生产者
- 测试
- 3.6 Topic: 主题模式 ❤❤❤❤
- 创建交换机、消息队列
- 消费者
- 生产者
- 测试
- 单一消费
- 多消费
- 3.7 Header策略 ❤
- 交换机配置
- 消费者
- 生产者
- 测试
- 测试:给name=admin先生发生消息
- 测试:给name=root先生发生消息
- 测试:给other=yh先生发生消息
- 测试whereAll
- ******************************************************
12.1 JMS_ActiveMQ
1. 简介
2. ActiveMQ安装
Linux安装命令
//下载
wget http://mirrors.hust.edu.cn/apache/activemq/5.15.16/apache-activemg-5.15.16-bin.tar.gz
//解压
Tar -zxvf apache-activemg-5.15.16-bin.tar.gz
//启动
cd apache-activemq-5.15.16
cd bin/
./activemq start
//关闭
./activemq stop
ps -ef |grep activemq //查看组件状态
./activemq console //查询运行日志
chkconfig activemq on //设置成开机启动
问题1:网页访问不了
环境如果安装了rabbitMQ,两个公用了端口5627
解决方法
转到activemq的安装目录conf
进入conf文件夹,打开activemq.xml
找到下面这行代码,修改里面的端口(把原来原来的5672改成5673)
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>**
问题2: 修改密码
找到conf/jetty-realm.properties文件
3. 整合SpringBoot
3.1 依赖
Spring Boot为ActiveMQ配置提供了相关的“Starter”,因此整合非常容易。首先创建SpringBoot项目,添加ActiveMQ依赖,代码如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
然后在 application.properties 中进行连接配置,代码如下:
3.2 配置
spring.activemq.broker-url=tcp://192.168.66.129:61616
spring.activemq.packages.trust-all=true
spring.activemq.user=admin
spring.activemq.password=admin
或
spring:
activemq:
broker-url: tcp://118.31.72.136:61616
packages:
trust-all: true
user: admin
password: Abc1234%
3.3 JmsComponent 组件
package com.ruoyi.common.core.mq;
import com.ruoyi.common.core.domain.MessageMQ;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.jms.Queue;
/**
* jms,activeMQ组件
*/
@Component
public class JmsComponent {
/**
* amq消息对列
*
* @return
*/
@Bean
Queue queueAMQ() {
return new ActiveMQQueue("amq");
}
/**
* 自定义队列:系统消息
*
* @return
*/
@Bean
Queue queueSystem() {
return new ActiveMQQueue("amq_sys");
}
@Resource
private JmsMessagingTemplate messagingTemplate;
@Resource
private Queue queueAMQ;
@Resource
private Queue queueSystem;
/**
* 生产者1
*
* @param message
*/
public void send(MessageMQ message) {
messagingTemplate.convertAndSend(this.queueAMQ, message);
}
/**
* 生产者2
*
* @param message
*/
public void sendToSys(MessageMQ message) {
messagingTemplate.convertAndSend(this.queueSystem, message);
}
/**
* 消费者:消息对列amq监听器
*
* @param message
*/
@JmsListener(destination = "amq")
public void listenerByAmq(MessageMQ message) {
System.out.println("ActiveMQ::amq::" + message);
}
/**
* 消费者:消息对列amq_sys监听器
*
* @param message
*/
@JmsListener(destination = "amq_sys")
public void listenerBySys(MessageMQ message) {
System.out.println("ActiveMQ::amq_sys::" + message);
}
}
3.4 测试
/**
* 测试JMS的activemq服务
*
* @param message
* @return
*/
@GetMapping("/activemq")
public AjaxResult activemq(MessageMQ message) {
message.setDate(new Date());
jmsComponent.send(message);
return AjaxResult.success();
}
/**
* 测试JMS的activemq服务
*
* @param message
* @return
*/
@GetMapping("/activemqToSys")
public AjaxResult activemqToSys(MessageMQ message) {
message.setDate(new Date());
jmsComponent.sendToSys(message);
return AjaxResult.success();
}
12.2 AMQP_RabbitMQ
1. 简介
2. RabbitMQ
2.1 Erlang环境安装(略)
Erlang wget 下载地址
# 下载安装包
wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.1-1.el7.x86_64.rpm/download.rpm
yum localinstall erlang-22.3.1-1.el7.x86_64.rpm
cd otp_src_21.0
#编译
./otp_src_21.0
./configure
make
# 安装
make install
# 检验
erl
# Erlang添加到yum源
vi /etc/yum.repos.d/rabbitmq-erlang.repo
添加内容
[rabbitmq-erlang]
name=rabbit-erlang
baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/21/el/7
gpgcheck=1
gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
repo_gpgcheck=0
enabled=1
# 清除缓存,创建新缓存
yum clean all
yum makecache
2.2 安装RabbitMQ(❤❤❤❤)
安装
2.3 启动RabbitMQ
3. 整合SpringBoot
3.1 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2 配置文件
项目创建成功后,在 application.yml
中配置 RabbitMQ 基本连接信息,代码如下:
spring:
rabbitmq:
host: 118.31.72.136
port: 5672
username: admin
password: Abc1234%
virtual-host: /
listener:
simple:
#消费者数量
concurrency: 50
#消费者允许最大数量
max-concurrency: 10000
#每次从堆里默认一条链接
prefetch: 1
#消费者自动启动
auto-startup: true
3.3 交换机Exchange配置
1、概念说明:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
2、消息队列的使用过程大概如下:
(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符 号”#”匹配一个或多个词,符号””匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。还 有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储
RabbitMQ提供了四种Exchange:fanout,direct,topic,header
header模式在实际使用中较少,本文只对前三种模式进行比较。
性能排序:fanout > direct >> topic。比例大约为11:10:6
3.4 Direct:绑定策略 (默认) ❤❤❤❤
定义交换机、消息队列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RMQ:Direct交换机
*/
@Configuration
public class RabbitDirectConfig {
public final static String DIRECTNAME = "admin-direct";
public static final String HELLO_QUEUE= "hello-queue";
@Bean
Queue queueDirect() {
return new Queue(HELLO_QUEUE);
}
/**
* 设置交换机类型及配置信息
*
* @return
*/
@Bean
DirectExchange directExchange() {
//交换机名字,重启后是否有效,长期未用是否删除
return new DirectExchange(DIRECTNAME, true, false);
}
/**
* 将queueDirect消息对列与directExchange交换机绑定
*
* @return
*/
@Bean
Binding binding() {
return BindingBuilder.bind(queueDirect())
.to(directExchange())
.with(HELLO_QUEUE);
}
}
消费者
import com.ruoyi.common.config.RabbitDirectConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* RabbitMQ_Direct:消费者服务
*/
@Component
public class DirectReceiver {
/**
* 监听system消息队列
*
* @param msg
*/
@RabbitListener(queues = {RabbitDirectConfig.HELLO_QUEUE})
public void listenerHQ(String msg) {
System.out.println("DirectReceiver消费者::" + msg);
}
}
生产者
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.ruoyi.common.config.RabbitDirectConfig;
import com.ruoyi.common.core.domain.MessageMQ;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* rabbitmq消息队列生产者
*
* @author zld
* @since 2022-11-21
*/
@Slf4j
@Component
public class RabbitmqProducer {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 发送消息到交换机DirectReceiver
*
* @param message
*/
public void sendDirect(MessageMQ message) {
String mes = JSONUtil.toJsonStr(message);
log.info("+++++++++++++++++++++ message = {}", mes);
rabbitTemplate.convertAndSend(RabbitDirectConfig.HELLO_QUEUE, mes);
}
}
@Resource
private RabbitmqProducer rabbitmqProducer;
/**
* 测试AMQP的RabbitMq服务
*
* @param message
* @return
*/
@GetMapping("/rabbirmq")
public AjaxResult rabbirmq(MessageMQ message) {
rabbitmqProducer.sendPowerInfo(message);
return AjaxResult.success();
}
3.5 Fanout: 订阅模式 ❤❤❤❤
简而言之:就是生产者将消息发送到交换机,该交换机转发到其下所有队列中
创建Fanout交换机、多个消息队列
给该交换机绑定两个队列,便于演示订阅模式
package com.ruoyi.common.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 订阅模式
*/
@Configuration
public class RabbitFanoutConfig {
/**
* 交换机名称
*/
public final static String FANOUTNAME = "admin-fanout";
public final static String FANOUTQUEUE = "fanout-queue";
public final static String FANOUTTWOQUEUE = "fanout-two-queue";
/**
* 定义交换机
*
* @return
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUTNAME, true, false);
}
@Bean
public Queue queueFanout() {
return new Queue(FANOUTQUEUE);
}
/**
* 交换机绑定队列1
*
* @return
*/
@Bean
Binding binding1() {
return BindingBuilder.bind(queueFanout()).to(fanoutExchange());
}
@Bean
public Queue queueFanoutTwo() {
return new Queue(FANOUTTWOQUEUE);
}
/**
* 交换机绑定队列2
*
* @return
*/
@Bean
Binding binding2() {
return BindingBuilder.bind(queueFanoutTwo()).to(fanoutExchange());
}
}
消费者
import cn.hutool.json.JSONUtil;
import com.ruoyi.common.config.RabbitFanoutConfig;
import com.ruoyi.common.core.domain.MessageMQ;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Fanout订阅模式:消费者
*/
@Component
public class FanoutReceiver {
@RabbitListener(queues = {RabbitFanoutConfig.FANOUTQUEUE})
private void RabbitListener1(String msg) {
MessageMQ bean = JSONUtil.toBean(msg, MessageMQ.class);
System.out.println("消费者1::订阅模式:" + bean.toString());
}
@RabbitListener(queues = {RabbitFanoutConfig.FANOUTTWOQUEUE})
private void RabbitListener2(String msg) {
MessageMQ bean = JSONUtil.toBean(msg, MessageMQ.class);
System.out.println("消费者2::订阅模式:" + bean.toString());
}
}
生产者
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.ruoyi.common.config.RabbitDirectConfig;
import com.ruoyi.common.config.RabbitFanoutConfig;
import com.ruoyi.common.core.domain.MessageMQ;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* rabbitmq消息队列生产者
*
* @author zld
* @since 2022-11-21
*/
@Slf4j
@Component
public class RabbitmqProducer {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 订阅模式:发送消息到交换机Fanout
*
* @param message
*/
public void sendFanout(MessageMQ message) {
String mes = JSONUtil.toJsonStr(message);
//注意::::这里第一个参数变成指定订阅模式的交换机名称,第二个参数不在指定routingkey(即消息队列)设置为null
rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME, null, mes);
}
}
测试
@Resource
private RabbitmqProducer rabbitmqProducer;
/**
* 测试AMQP_RabbotMQ服务
*
* @param message
* @return
*/
@GetMapping("/rabbitmq")
public AjaxResult rabbitmq(MessageMQ message) {
message.setDate(DateUtil.nextWeek());
rabbitmqProducer.sendFanout(message);
return AjaxResult.success();
}
3.6 Topic: 主题模式 ❤❤❤❤
TopicExchange
是比较复杂也比较灵活的一种路由策略。
在TopicExchange
中, Queue通过routingkey绑定到TopicExchange上,当消息到达TopicExchange后,TopicExchange根据消息的routingkey将消息路由到一个或者多Queue 上。
创建交换机、消息队列
TopicExchange 如下:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 主题式交换机
*/
@Configuration
public class RabbitTopicConfig {
//交换机名称
public final static String TOPICNAME = "admin-topic";
public final static String XIAOMI_QUEUE = "xiaomi";
public final static String HUAWEI_QUEUE = "huawei";
public final static String PHONE_QUEUE = "phone";
//1. 创建主题交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPICNAME, true, false);
}
//2. 创建消息队列
@Bean
public Queue xiaomi() {
return new Queue(XIAOMI_QUEUE);
}
@Bean
public Queue huawei() {
return new Queue(HUAWEI_QUEUE);
}
@Bean
public Queue phone() {
return new Queue(PHONE_QUEUE);
}
//3. 消息队列绑定交换机,其中with方法参数配置很重要,灵活的体现
@Bean
Binding xiaomiBinding() {
return BindingBuilder.bind(xiaomi()).to(topicExchange()).with(XIAOMI_QUEUE + ".#");
}
@Bean
Binding huaweiBinding() {
return BindingBuilder.bind(huawei()).to(topicExchange()).with(HUAWEI_QUEUE + ".#");
}
@Bean
Binding phoneBinding() {
return BindingBuilder.bind(phone()).to(topicExchange()).with("#." + PHONE_QUEUE + ".#");
}
}
消费者
import com.ruoyi.common.config.RabbitTopicConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消费者:主题模式
*/
@Component
public class TopicReceiver {
//消费phone队列
@RabbitListener(queues = {RabbitTopicConfig.PHONE_QUEUE})
public void phoneConsumer(String msg) {
System.out.println("消费者::主题模式::phoneConsumer::" + msg);
}
//消费小米
@RabbitListener(queues = {RabbitTopicConfig.XIAOMI_QUEUE})
public void xiaomiConsumer(String msg) {
System.out.println("消费者::主题模式::xiaomiConsumer::" + msg);
}
//消费华为
@RabbitListener(queues = {RabbitTopicConfig.HUAWEI_QUEUE})
public void huaweiConsumer(String msg) {
System.out.println("消费者::主题模式::huaweiConsumer::" + msg);
}
//消费小米和华为
@RabbitListener(queues = {RabbitTopicConfig.HUAWEI_QUEUE, RabbitTopicConfig.XIAOMI_QUEUE})
public void mhConsumer(String msg) {
System.out.println("消费者::主题模式::米华Consumer::" + msg);
}
}
生产者
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.ruoyi.common.config.RabbitDirectConfig;
import com.ruoyi.common.config.RabbitFanoutConfig;
import com.ruoyi.common.config.RabbitTopicConfig;
import com.ruoyi.common.core.domain.MessageMQ;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* rabbitmq消息队列生产者
*
* @author zld
* @since 2022-11-21
*/
@Slf4j
@Component
public class RabbitmqProducer {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 主题模式:发送消息到交换机Topic
*
* @param message
*/
public void sendTopic(MessageMQ message) {
//参数1:指定交换机,参数2:主题路径,参数3:消息1内容
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, message.getTopic(), message.getMessage());
}
}
测试
@Resource
private RabbitmqProducer rabbitmqProducer;
/**
* 测试AMQP_RabbotMQ服务
*
* @param message
* @return
*/
@GetMapping("/rabbitmq")
public AjaxResult rabbitmq(MessageMQ message) {
System.out.println("**********主题模式**********");
System.out.println("消息主题::" + message.getTopic());
rabbitmqProducer.sendTopic(message);
return AjaxResult.success();
}
单一消费
测1:小米消费者
测2:华为消费者
测3:手机消费者
多消费
测4:小米+手机,消费者
测5:华为+手机,略
测6:华为+小米
3.7 Header策略 ❤
交换机配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* Header交换机
*/
@Configuration
public class RabbitHeaderConfig {
public final static String HEADER_NAME = "admin-header";
public final static String NAME_QUEUE = "name_queue";
public final static String NK_QUEUE = "nk_queue";
public final static String AGE_QUEUE = "age_queue";
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange(HEADER_NAME, true, false);
}
@Bean
public Queue queueNk() {
return new Queue(NK_QUEUE);
}
@Bean
public Queue queueName() {
return new Queue(NAME_QUEUE);
}
@Bean
public Queue queueAge() {
return new Queue(AGE_QUEUE);
}
@Bean
Binding bindingNk() {
Map<String, Object> map = new HashMap<>();
map.put("nk1", "admin1");
map.put("nk2", "admin2");
//whereAll 所有的键值对都匹配才能转发消息
return BindingBuilder.bind(queueNk()).to(headersExchange()).whereAll(map).match();
}
@Bean
Binding bindingName() {
Map<String, Object> map = new HashMap<>();
map.put("name", "root");
map.put("other", "yh");
//只要有键值对匹配就能转发消息
return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match();
}
@Bean
Binding bindingAge() {
return BindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists();
}
}
消费者
注意,这里的参数用 byte 数组接收
package com.ruoyi.common.core.mq;
import com.ruoyi.common.config.RabbitHeaderConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Header消费者
*/
@Component
public class HeaderReceiver {
@RabbitListener(queues = {RabbitHeaderConfig.NAME_QUEUE})
private void header1(byte[] msg) {
System.out.println("header1::name_queue::" + new String(msg, 0, msg.length));
}
@RabbitListener(queues = {RabbitHeaderConfig.AGE_QUEUE})
private void header2(byte[] msg) {
System.out.println("header2::age_queue::" + new String(msg, 0, msg.length));
}
@RabbitListener(queues = {RabbitHeaderConfig.NK_QUEUE})
private void header3(byte[] msg) {
System.out.println("header3::nk_queue::" + new String(msg, 0, msg.length));
}
}
生产者
消息的发送和 routingkey
无关
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.ruoyi.common.config.RabbitDirectConfig;
import com.ruoyi.common.config.RabbitFanoutConfig;
import com.ruoyi.common.config.RabbitHeaderConfig;
import com.ruoyi.common.config.RabbitTopicConfig;
import com.ruoyi.common.core.domain.MessageMQ;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageBuilderSupport;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* rabbitmq消息队列生产者
*
* @author zld
* @since 2022-11-21
*/
@Slf4j
@Component
public class RabbitmqProducer {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* Header模式:发送消息到交换机
*
* @param message
*/
public void sendHeader(MessageMQ message) {
Message msg = MessageBuilder
.withBody(message.getMessage().getBytes())
.setHeader(message.getType(), message.getTypeVal())
.build();
rabbitTemplate.send(RabbitHeaderConfig.HEADER_NAME, null, msg);
}
}
测试
测试:给name=admin先生发生消息
发送到了交换机,但是没有消费者消费
在bindingName方法中没有配置name=admin
测试:给name=root先生发生消息
测试:给other=yh先生发生消息
测试whereAll
heard交换机消息队列绑定
@Bean
Binding bindingNk() {
Map<String, Object> map = new HashMap<>();
map.put("nk1", "admin1");
map.put("nk2", "admin2");
//whereAll 所有的键值对都匹配才能转发消息
return BindingBuilder.bind(queueNk()).to(headersExchange()).whereAll(map).match();
}
生产者
public void sendHeaderAll(MessageMQ message) {
Message msg = MessageBuilder
.withBody(message.getMessage().getBytes())
.setHeader(message.getTypeAll1(), message.getTypeValAll1())
.setHeader(message.getTypeAll2(), message.getTypeValAll2())
.build();
rabbitTemplate.send(RabbitHeaderConfig.HEADER_NAME, null, msg);
}
测试api
@GetMapping("/rabbitmq")
public AjaxResult rabbitmq(MessageMQ message) {
System.out.println("**********Header模式**********");
System.out.println("消息类型1::" + message.getTypeAll1() + ":" + message.getTypeValAll1());
System.out.println("消息类型2::" + message.getTypeAll2() + ":" + message.getTypeValAll2());
rabbitmqProducer.sendHeaderAll(message);
return AjaxResult.success();
}
测试结果,成功消费