介绍
RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。
RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
使用场景
1.服务解耦:生产者与消费者之间不是直接调用的,中间存在消息中间件,生产者不需要关心消费者的调用情况
2.流量削峰:在高并发的情况下,系统服务没法及时处理大量的并发请求,此时可以把并发请求发送消费队列中,消费者从队列获取请求并处理,从而减少系统的压力
3.异步调用:任务之间可以异步执行,从而减少整体执行时长
基本组成
PRODUCER生产者、COMSUMER消费者、Exchange交换机、Message Queue消息队列、Binding Key普通键、Routing Key路由键、Broker中间件(包含队列与交换机)
其中:PRODUCER生产者负责发送消息
COMSUMER消费者负责响应消息
yml文件配置
生产者:
spring:
application:
name: rabbitmq-server
#RabbitMQ
rabbitmq:
#ip
host: 192.168.17.128
#用户名
username: rabbitmq
#密码
password: rabbitmq
#端口
port: 5673
#虚拟主机名,默认/
virtual-host: rabbitmq
消费者:
spring:
application:
name: rabbitmq-consumer
#RabbitMQ
rabbitmq:
#ip
host: 192.168.17.128
#用户名
username: rabbitmq
#密码
password: rabbitmq
#端口
port: 5673
#虚拟主机名
virtual-host: rabbitmq
listener:
simple:
#同一时间抓取的数量,待处理完再抓取
prefetch: 1
#设置手动签收,默认自动签收
acknowledge-mode: manual
简单模式
一个生产者,一个消费者
生产者代码
声明队列
/**
* 简单模式队列
* @return
*/
@Bean
public Queue simpleQueue(){
//持久化 非独占 非自动删除
return QueueBuilder.durable("simpleQueue").build();
}
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 组装消息
* @param msg
* @return
*/
private static Map<String, Object> createMsg(Object msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
Map<String,Object> message= Maps.newHashMap();
message.put("sendTime",sdf.format(new Date()));
message.put("msg", msg);
message.put("msgId",msgId);
return message;
}
@GetMapping("simple")
@ApiOperation("简单模式")
public String simple(@RequestParam String msg){
Map<String, Object> map = createMsg(msg);
rabbitTemplate.convertAndSend("simpleQueue",map);
return "ok";
}
消费者代码
/**
* 简单模式的消费者
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
//使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
@RabbitListener(queuesToDeclare = @Queue(value = "simpleQueue"))
public void simple(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
long tag = properties.getDeliveryTag();
log.info("简单模式的消费者收到:{}",msg);
//由于在yml设置手动回执,此处需要手动回执,不批量签收,回执后才能处理下一批消息
c.basicAck(tag,false);
}
工作模式
一个生产者向队列发送消息,多个消费者从同一个队列取消息
是否手动回执ACk,合理分发QOS,消息持久化
生产者代码
声明队列
/**
* 工作模式队列
* @return
*/
@Bean
public Queue workQueue(){
return QueueBuilder.durable("workQueue").build();
}
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 组装消息
* @param msg
* @return
*/
private static Map<String, Object> createMsg(Object msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
Map<String,Object> message= Maps.newHashMap();
message.put("sendTime",sdf.format(new Date()));
message.put("msg", msg);
message.put("msgId",msgId);
return message;
}
@GetMapping("work")
@ApiOperation("工作模式")
public String work(){
for (int i = 0; i <100; i++) {
rabbitTemplate.convertAndSend("workQueue",createMsg(i),message -> {
MessageProperties messageProperties = message.getMessageProperties();
//默认消息持久化,设置消息不持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
return message;
});
}
return "ok";
}
消费者代码
/**
* 工作模式的消费者1
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
//使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
@RabbitListener(queuesToDeclare = @Queue(value = "workQueue"))
public void work1(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
long tag = properties.getDeliveryTag();
log.info("工作模式的消费者1收到:{}",msg);
//手动回执,不批量签收,回执后才能处理下一批消息
c.basicAck(tag,false);
}
/**
* 工作模式的消费者2
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
//使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
@RabbitListener(queuesToDeclare = @Queue(value = "workQueue"))
public void work2(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
long tag = properties.getDeliveryTag();
log.info("工作模式的消费者2收到:{}",msg);
//手动回执,不批量签收,回执后才能处理下一批消息
c.basicAck(tag,false);
}
发布订阅模式
生产者通过fanout扇出交换机群发消息给消费者,同一条消息每一个消费者都可以收到
方法1:生产者创建交换机,消费者创建队列与监听队列
生产者代码
创建交换机
/**
* fanout交换机
* @return
*/
@Bean
public Exchange fautExchange1(){
//持久化 非自动删除
return ExchangeBuilder.fanoutExchange("fanout").build();
}
//创建初始化RabbitAdmin对象
@Bean
public RabbitAdmin fanoutRabbitAdmin1(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.declareExchange(fanoutExchange1());
return rabbitAdmin;
}
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 组装消息
* @param msg
* @return
*/
private static Map<String, Object> createMsg(Object msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
Map<String,Object> message= Maps.newHashMap();
message.put("sendTime",sdf.format(new Date()));
message.put("msg", msg);
message.put("msgId",msgId);
return message;
}
@GetMapping("fanout1")
@ApiOperation("发布订阅模式1")
public String fanout1(@RequestParam String msg){
Map<String, Object> map = createMsg(msg);
rabbitTemplate.convertAndSend("fanout",null,map);
return "ok";
}
消费者代码
/**
* 发布订阅模式方法1的消费者1
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
@RabbitListener(bindings =@QueueBinding(
value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
exchange =@Exchange(name="fanout",declare = "false")//declare = "false":生产者已定义交换机,此处不再声明交换机
))
public void fanout1(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
long tag = properties.getDeliveryTag();
log.info("发布订阅模式方法1的消费者1收到:{}",msg);
//手动回执,不批量签收,回执后才能处理下一批消息
c.basicAck(tag,false);
}
/**
* 发布订阅模式方法1的消费者2
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
exchange =@Exchange(name="fanout",declare = "false")//declare = "false":生产者已定义交换机,此处不再声明交换机
))
public void fanout2(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
long tag = properties.getDeliveryTag();
log.info("发布订阅模式方法1的消费者2收到:{}",msg);
//手动回执,不批量签收,回执后才能处理下一批消息
c.basicAck(tag,false);
}
方法2:生产者创建队列与交换机,消费者监听队列
生产者代码
声明队列与交换机
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 扇出交换机
*/
@Configuration
public class FanoutExchangeConfiguration{
/**
* 定义队列 持久 非排他 非自动删除
* @return
*/
@Bean
public Queue fanoutQueue1(){
return QueueBuilder.durable("fanout-queue1").build();
}
/**
* 定义队列 持久 非排他 非自动删除
* @return
*/
@Bean
public Queue fanoutQueue2(){
return QueueBuilder.durable("fanout-queue2").build();
}
/**
* 定义扇出交换机 持久 非自动删除
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange("logs").build();
}
/**
* 将队列1与交换机绑定
* @return
*/
@Bean
public Binding fanoutBinding1(){
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
/**
* 将队列2与交换机绑定
* @return
*/
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
//创建初始化RabbitAdmin对象
@Bean
public RabbitAdmin fanoutRabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.declareExchange(fanoutExchange());
rabbitAdmin.declareQueue(fanoutQueue1());
rabbitAdmin.declareQueue(fanoutQueue2());
return rabbitAdmin;
}
}
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 组装消息
* @param msg
* @return
*/
private static Map<String, Object> createMsg(Object msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
Map<String,Object> message= Maps.newHashMap();
message.put("sendTime",sdf.format(new Date()));
message.put("msg", msg);
message.put("msgId",msgId);
return message;
}
@GetMapping("fanout2")
@ApiOperation("发布订阅模式2")
public String fanout(@RequestParam String msg){
Map<String, Object> map = createMsg(msg);
rabbitTemplate.convertAndSend("logs",null,map);
return "ok";
}
消费者代码
/**
* 发布订阅模式方法2的消费者1
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
//使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
@RabbitListener(queuesToDeclare = @Queue(value = "fanout-queue1"))
public void logs1(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
long tag = properties.getDeliveryTag();
log.info("发布订阅模式方法2的消费者1收到:{}",msg);
//手动回执,不批量签收,回执后才能处理下一批消息
c.basicAck(tag,false);
}
/**
* 发布订阅模式方法2的消费者2
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
//使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
@RabbitListener(queuesToDeclare = @Queue(value = "fanout-queue2"))
public void logs2(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
long tag = properties.getDeliveryTag();
log.info("发布订阅模式方法2的消费者2收到:{}",msg);
//手动回执,不批量签收,回执后才能处理下一批消息
c.basicAck(tag,false);
}
路由模式
生产者提供通过direct直流交换机给消费者发送消息,消费者通过关键字匹配规则从绑定的队列获取消息
方法1:生产者创建交换机,消费者创建队列与监听队列
生产者代码
创建交换机
/**
* 直流交换机
* @return
*/
@Bean
public Exchange routeExchange(){
//持久化 非自动删除
return ExchangeBuilder.directExchange("route").build();
}
//创建初始化RabbitAdmin对象
@Bean
public RabbitAdmin RabbitAdminroute(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.declareExchange(routeExchange());
return rabbitAdmin;
}
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 组装消息
* @param msg
* @return
*/
private static Map<String, Object> createMsg(Object msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
Map<String,Object> message= Maps.newHashMap();
message.put("sendTime",sdf.format(new Date()));
message.put("msg", msg);
message.put("msgId",msgId);
return message;
}
@GetMapping("route")
@ApiOperation("路由模式1")
public String route(@RequestParam String msg, @RequestParam String routingKey){
Map<String, Object> map = createMsg(msg);
rabbitTemplate.convertAndSend("route",routingKey,map);
return "ok";
}
消费者代码
/**
* 路由式方法1的消费者1
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
@RabbitListener(bindings =@QueueBinding(
value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
exchange =@Exchange(name="route",declare = "false"),//declare = "false":生产者已定义交换机,此处不再声明交换机
key = {"error"}//路由键
))
public void route1(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
String routingKey = properties.getReceivedRoutingKey();
log.info("路由模式方法1的消费者1收到:{},路由键:{}",msg,routingKey);
//手动回执,不批量签收,回执后才能处理下一批消息
long tag = properties.getDeliveryTag();
c.basicAck(tag,false);
}
/**
* 路由式方法1的消费者2
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
@RabbitListener(bindings =@QueueBinding(
value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
exchange =@Exchange(name="route",declare = "false"),//declare = "false":生产者已定义交换机,此处不再声明交换机
key = {"info","debug"}//路由键
))
public void route2(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
String routingKey = properties.getReceivedRoutingKey();
log.info("路由模式方法1的消费者2收到:{},路由键:{}",msg,routingKey);
//手动回执,不批量签收,回执后才能处理下一批消息
long tag = properties.getDeliveryTag();
c.basicAck(tag,false);
}
方法2:生产者创建队列与交换机,消费者监听队列
生产者代码
声明队列与交换机
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 路由交换机
*/
@Configuration
public class RouteExchangeConfiguration {
/**
* 定义队列 持久 非排他 非自动删除
* @return
*/
@Bean
public Queue directQueue1(){
return QueueBuilder.durable("direct-queue1").build();
}
/**
* 定义队列 持久 非排他 非自动删除
* @return
*/
@Bean
public Queue directQueue2(){
return QueueBuilder.durable("direct-queue2").build();
}
/**
* 定义路由交换机 持久 非自动删除
* @return
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("direct").build();
}
/**
* 将队列1与交换机绑定,路由键:error
* @return
*/
@Bean
public Binding directBinding1(){
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("error");
}
/**
* 将队列2与交换机绑定,路由键:info
* @return
*/
@Bean
public Binding directBinding2(){
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("info");
}
/**
* 将队列2与交换机绑定,路由键:debug
* @return
*/
@Bean
public Binding directBinding3(){
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("debug");
}
//创建初始化RabbitAdmin对象
@Bean
public RabbitAdmin directRabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.declareExchange(directExchange());
rabbitAdmin.declareQueue(directQueue1());
rabbitAdmin.declareQueue(directQueue2());
return rabbitAdmin;
}
}
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 组装消息
* @param msg
* @return
*/
private static Map<String, Object> createMsg(Object msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
Map<String,Object> message= Maps.newHashMap();
message.put("sendTime",sdf.format(new Date()));
message.put("msg", msg);
message.put("msgId",msgId);
return message;
}
@GetMapping("direct")
@ApiOperation("路由模式2")
public String direct(@RequestParam String msg, @RequestParam String routingKey){
Map<String, Object> map = createMsg(msg);
rabbitTemplate.convertAndSend("direct",routingKey,map);
return "ok";
}
消费者代码
/**
* 路由模式方法2的消费者1
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
//使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
@RabbitListener(queuesToDeclare = @Queue(value = "direct-queue1"))
public void direct1(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
String routingKey = properties.getReceivedRoutingKey();
log.info("路由模式方法2的消费者1收到:{},路由键:{}",msg,routingKey);
//手动回执,不批量签收,回执后才能处理下一批消息
long tag = properties.getDeliveryTag();
c.basicAck(tag,false);
}
/**
* 路由模式方法2的消费者2
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
//使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
@RabbitListener(queuesToDeclare = @Queue(value = "direct-queue2"))
public void direct2(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
String routingKey = properties.getReceivedRoutingKey();
log.info("路由模式方法2的消费者2收到:{},路由键:{}",msg,routingKey);
//手动回执,不批量签收,回执后才能处理下一批消息
long tag = properties.getDeliveryTag();
c.basicAck(tag,false);
}
主题模式
生产者通过topic主题交换机给消费者发送消息,消费者通过特殊的关键字匹配规则从绑定的队列取消息
方法1:生产者创建交换机,消费者创建队列与监听队列
生产者代码
创建交换机
/**
* 主题交换机
* @return
*/
@Bean
public Exchange themeExchange(){
//持久化 非自动删除
return ExchangeBuilder.topicExchange("theme").build();
}
//创建初始化RabbitAdmin对象
@Bean
public RabbitAdmin rabbitAdmintheme(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.declareExchange(themeExchange());
return rabbitAdmin;
}
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 组装消息
* @param msg
* @return
*/
private static Map<String, Object> createMsg(Object msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
Map<String,Object> message= Maps.newHashMap();
message.put("sendTime",sdf.format(new Date()));
message.put("msg", msg);
message.put("msgId",msgId);
return message;
}
@GetMapping("theme")
@ApiOperation("主题模式1")
public String theme(@RequestParam String msg, @RequestParam String routingKey){
Map<String, Object> map = createMsg(msg);
rabbitTemplate.convertAndSend("theme",routingKey,map);
return "ok";
}
消费者代码
/**
* 主题模式方法1的消费者1
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
@RabbitListener(bindings =@QueueBinding(
value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
exchange =@Exchange(name="theme",type = ExchangeTypes.TOPIC),//declare = "false":生产者已定义交换机,此处不再声明交换机
key = {"*.error.*"}
))
public void theme1(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
String routingKey = properties.getReceivedRoutingKey();
log.info("主题模式方法1的消费者1收到:{},路由键:{}",msg,routingKey);
//手动回执,不批量签收,回执后才能处理下一批消息
long tag = properties.getDeliveryTag();
c.basicAck(tag,false);
}
/**
* 主题模式方法1的消费者2
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
@RabbitListener(bindings =@QueueBinding(
value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
exchange =@Exchange(name="theme",type = ExchangeTypes.TOPIC),//declare = "false":生产者已定义交换机,此处不再声明交换机
key = {"#.info","debug.#"}
))
public void theme2(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
String routingKey = properties.getReceivedRoutingKey();
log.info("主题模式方法1的消费者2收到:{},路由键:{}",msg,routingKey);
//手动回执,不批量签收,回执后才能处理下一批消息
long tag = properties.getDeliveryTag();
c.basicAck(tag,false);
}
方法2:生产者创建队列与交换机,消费者监听队列
生产者代码
声明队列与交换机
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 主题交换机
*/
@Configuration
public class TopicExchangeConfiguration {
/**
* 定义队列 持久 非排他 非自动删除
* @return
*/
@Bean
public Queue topicQueue1(){
return QueueBuilder.durable("topic-queue1").build();
}
/**
* 定义队列 持久 非排他 非自动删除
* @return
*/
@Bean
public Queue topicQueue2(){
return QueueBuilder.durable("topic-queue2").build();
}
/**
* 定义路由交换机 持久 非自动删除
* @return
*/
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange("topic").build();
}
/**
* 将队列1与交换机绑定,路由键:A.*
* @return
*/
@Bean
public Binding topicBinding1(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("A.*");
}
/**
* 将队列2与交换机绑定,路由键:#.B
* @return
*/
@Bean
public Binding topicBinding2(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("#.*");
}
/**
* 将队列2与交换机绑定,路由键:A.B
* @return
*/
@Bean
public Binding topicBinding3(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("A.B");
}
//创建初始化RabbitAdmin对象
@Bean
public RabbitAdmin topicRabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.declareExchange(topicExchange());
rabbitAdmin.declareQueue(topicQueue1());
rabbitAdmin.declareQueue(topicQueue2());
return rabbitAdmin;
}
}
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 组装消息
* @param msg
* @return
*/
private static Map<String, Object> createMsg(Object msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
Map<String,Object> message= Maps.newHashMap();
message.put("sendTime",sdf.format(new Date()));
message.put("msg", msg);
message.put("msgId",msgId);
return message;
}
@GetMapping("topic")
@ApiOperation("主题模式2")
public String topic2(@RequestParam String msg, @RequestParam String routingKey){
Map<String, Object> map = createMsg(msg);
rabbitTemplate.convertAndSend("topic",routingKey,map);
return "ok";
}
消费者代码
/**
* 主题模式方法2的消费者1
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
//使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
@RabbitListener(queuesToDeclare = @Queue(value = "topic-queue1"))
public void topic1(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
String routingKey = properties.getReceivedRoutingKey();
log.info("主题模式方法2的消费者1收到:{},路由键:{}",msg,routingKey);
//手动回执,不批量签收,回执后才能处理下一批消息
long tag = properties.getDeliveryTag();
c.basicAck(tag,false);
}
/**
* 主题模式方法2的消费者2
* @param message 消息
* @param c 通道
* @param msg 消息内容
* @throws IOException
*/
//使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
@RabbitListener(queuesToDeclare = @Queue(value = "topic-queue2"))
public void topic2(Message message,Channel c,Map msg) throws IOException {
MessageProperties properties = message.getMessageProperties();
String routingKey = properties.getReceivedRoutingKey();
log.info("主题模式方法2的消费者2收到:{},路由键:{}",msg,routingKey);
//手动回执,不批量签收,回执后才能处理下一批消息
long tag = properties.getDeliveryTag();
c.basicAck(tag,false);
}
异步调用模式
生产者绑定调用队列向消费者发送消息,通过绑定返回队列异步接收消息处理结果
生产者代码
创建发送队列与接收队列
/**
* 定义异步发送队列
* @return
*/
@Bean
public Queue rpcQueue(){
return QueueBuilder.nonDurable("rpc-queue").build();
}
/**
* 定义异步接收队列
* @return
*/
@Bean
public Queue receivedQueue(){
return QueueBuilder.nonDurable(UUID.randomUUID().toString().replaceAll("-","")).build();
}
//创建初始化RabbitAdmin对象
@Bean
public RabbitAdmin rabbitAdmin111(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.declareQueue(rpcQueue());
rabbitAdmin.declareQueue(receivedQueue());
return rabbitAdmin;
}
发送与接收消息
@Value("#{receivedQueue.name}")
private String rndQueue;
@GetMapping("rpc")
@ApiOperation("异步调用模式")
public String rpc(@RequestParam Integer n){
rabbitTemplate.convertAndSend("rpc-queue", n, message -> {
MessageProperties properties = message.getMessageProperties();
properties.setReplyTo(rndQueue);
properties.setCorrelationId(UUID.randomUUID().toString());
return message;
}
);
return "ok";
}
//从随机队列接收计算结果
@RabbitListener(queues = "#{receivedQueue.name}")
public void receive(long r, @Header(name= AmqpHeaders.CORRELATION_ID) String correlationId) {
log.info("\n\n"+correlationId+" - 收到: "+r);
}
消费者代码
/**
* 异步调用
* @param n
* @return
*/
//使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
@RabbitListener(queuesToDeclare = @Queue(value = "rpc-queue",durable = "false"))
public long getFbnq(int n,Message message,Channel c) throws IOException {
MessageProperties properties = message.getMessageProperties();
long tag = properties.getDeliveryTag();
long result = f(n);
c.basicAck(tag,false);
return result;
}
private long f(int n) {
if (n==1 || n==2) {
return 1;
}
return f(n-1) + f(n-2);
}