SpringBoot整合RabbitMQ(六种工作模式介绍)

news2025/1/15 17:34:56

介绍

RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。
RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

使用场景

1.服务解耦:生产者与消费者之间不是直接调用的,中间存在消息中间件,生产者不需要关心消费者的调用情况
2.流量削峰:在高并发的情况下,系统服务没法及时处理大量的并发请求,此时可以把并发请求发送消费队列中,消费者从队列获取请求并处理,从而减少系统的压力
3.异步调用:任务之间可以异步执行,从而减少整体执行时长

基本组成

PRODUCER生产者、COMSUMER消费者、Exchange交换机、Message Queue消息队列、Binding Key普通键、Routing Key路由键、Broker中间件(包含队列与交换机)
其中:PRODUCER生产者负责发送消息
COMSUMER消费者负责响应消息

yml文件配置

生产者:

spring:
  application:
    name: rabbitmq-server
  #RabbitMQ
  rabbitmq:
    #ip
    host: 192.168.17.128
    #用户名
    username: rabbitmq
    #密码
    password: rabbitmq
    #端口
    port: 5673
    #虚拟主机名,默认/
    virtual-host: rabbitmq

消费者:

spring:
  application:
    name: rabbitmq-consumer
  #RabbitMQ
  rabbitmq:
    #ip
    host: 192.168.17.128
    #用户名
    username: rabbitmq
    #密码
    password: rabbitmq
    #端口
    port: 5673
    #虚拟主机名
    virtual-host: rabbitmq
    listener:
      simple:
        #同一时间抓取的数量,待处理完再抓取
        prefetch: 1
        #设置手动签收,默认自动签收
        acknowledge-mode: manual

简单模式

一个生产者,一个消费者
在这里插入图片描述

生产者代码

声明队列

	/**
     * 简单模式队列
     * @return
     */
    @Bean
    public Queue simpleQueue(){
        //持久化 非独占 非自动删除
        return QueueBuilder.durable("simpleQueue").build();
    }

发送消息

	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private RabbitTemplate rabbitTemplate;
   
	/**
     * 组装消息
     * @param msg
     * @return
     */
    private static Map<String, Object> createMsg(Object msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        Map<String,Object> message= Maps.newHashMap();
        message.put("sendTime",sdf.format(new Date()));
        message.put("msg", msg);
        message.put("msgId",msgId);
        return message;
    }
    
	@GetMapping("simple")
    @ApiOperation("简单模式")
    public String simple(@RequestParam String msg){
        Map<String, Object> map = createMsg(msg);
        rabbitTemplate.convertAndSend("simpleQueue",map);
        return "ok";
    }

消费者代码

	/**
     * 简单模式的消费者
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
    @RabbitListener(queuesToDeclare = @Queue(value = "simpleQueue"))
    public void simple(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();
        log.info("简单模式的消费者收到:{}",msg);
        //由于在yml设置手动回执,此处需要手动回执,不批量签收,回执后才能处理下一批消息
        c.basicAck(tag,false);
    }

工作模式

一个生产者向队列发送消息,多个消费者从同一个队列取消息
是否手动回执ACk,合理分发QOS,消息持久化
在这里插入图片描述
在这里插入图片描述

生产者代码

声明队列

 	/**
     * 工作模式队列
     * @return
     */
    @Bean
    public Queue workQueue(){
        return QueueBuilder.durable("workQueue").build();
    }

发送消息

	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private RabbitTemplate rabbitTemplate;
   
	/**
     * 组装消息
     * @param msg
     * @return
     */
    private static Map<String, Object> createMsg(Object msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        Map<String,Object> message= Maps.newHashMap();
        message.put("sendTime",sdf.format(new Date()));
        message.put("msg", msg);
        message.put("msgId",msgId);
        return message;
    }
    
 	@GetMapping("work")
    @ApiOperation("工作模式")
    public String work(){
        for (int i = 0; i <100; i++) {
            rabbitTemplate.convertAndSend("workQueue",createMsg(i),message -> {
                MessageProperties messageProperties = message.getMessageProperties();
                //默认消息持久化,设置消息不持久化
                messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                return message;
            });
        }
        return "ok";
    }

消费者代码

	/**
     * 工作模式的消费者1
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
    @RabbitListener(queuesToDeclare = @Queue(value = "workQueue"))
    public void work1(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();
        log.info("工作模式的消费者1收到:{}",msg);
        //手动回执,不批量签收,回执后才能处理下一批消息
        c.basicAck(tag,false);
    }

    /**
     * 工作模式的消费者2
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
    @RabbitListener(queuesToDeclare = @Queue(value = "workQueue"))
    public void work2(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();
        log.info("工作模式的消费者2收到:{}",msg);
        //手动回执,不批量签收,回执后才能处理下一批消息
        c.basicAck(tag,false);
    }

发布订阅模式

生产者通过fanout扇出交换机群发消息给消费者,同一条消息每一个消费者都可以收到
在这里插入图片描述
在这里插入图片描述

方法1:生产者创建交换机,消费者创建队列与监听队列

生产者代码

创建交换机
	 /**
     * fanout交换机
     * @return
     */
    @Bean
    public Exchange fautExchange1(){
        //持久化 非自动删除
        return ExchangeBuilder.fanoutExchange("fanout").build();
    }
    
 	//创建初始化RabbitAdmin对象
    @Bean
    public RabbitAdmin fanoutRabbitAdmin1(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        rabbitAdmin.declareExchange(fanoutExchange1());
        return rabbitAdmin;
    }
发送消息
	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private RabbitTemplate rabbitTemplate;
   
	/**
     * 组装消息
     * @param msg
     * @return
     */
    private static Map<String, Object> createMsg(Object msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        Map<String,Object> message= Maps.newHashMap();
        message.put("sendTime",sdf.format(new Date()));
        message.put("msg", msg);
        message.put("msgId",msgId);
        return message;
    }
	
	@GetMapping("fanout1")
    @ApiOperation("发布订阅模式1")
    public String fanout1(@RequestParam String msg){
        Map<String, Object> map = createMsg(msg);
        rabbitTemplate.convertAndSend("fanout",null,map);
        return "ok";
    }
消费者代码
	/**
     * 发布订阅模式方法1的消费者1
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    @RabbitListener(bindings =@QueueBinding(
            value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
            exchange =@Exchange(name="fanout",declare = "false")//declare = "false":生产者已定义交换机,此处不再声明交换机
    ))
    public void fanout1(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();
        log.info("发布订阅模式方法1的消费者1收到:{}",msg);
        //手动回执,不批量签收,回执后才能处理下一批消息
        c.basicAck(tag,false);
    }

    /**
     * 发布订阅模式方法1的消费者2
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
            exchange =@Exchange(name="fanout",declare = "false")//declare = "false":生产者已定义交换机,此处不再声明交换机
    ))
    public void fanout2(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();
        log.info("发布订阅模式方法1的消费者2收到:{}",msg);
        //手动回执,不批量签收,回执后才能处理下一批消息
        c.basicAck(tag,false);
    }

方法2:生产者创建队列与交换机,消费者监听队列

生产者代码

声明队列与交换机
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * 扇出交换机
 */
@Configuration
public class FanoutExchangeConfiguration{


    /**
     * 定义队列 持久 非排他 非自动删除
     * @return
     */
    @Bean
    public Queue fanoutQueue1(){
        return QueueBuilder.durable("fanout-queue1").build();
    }

    /**
     * 定义队列 持久 非排他 非自动删除
     * @return
     */
    @Bean
    public Queue fanoutQueue2(){
        return QueueBuilder.durable("fanout-queue2").build();
    }

    /**
     * 定义扇出交换机 持久  非自动删除
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return ExchangeBuilder.fanoutExchange("logs").build();
    }

    /**
     * 将队列1与交换机绑定
     * @return
     */
    @Bean
    public Binding fanoutBinding1(){
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }

    /**
     * 将队列2与交换机绑定
     * @return
     */
    @Bean
    public Binding fanoutBinding2(){
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

    //创建初始化RabbitAdmin对象
    @Bean
    public RabbitAdmin fanoutRabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        rabbitAdmin.declareExchange(fanoutExchange());
        rabbitAdmin.declareQueue(fanoutQueue1());
        rabbitAdmin.declareQueue(fanoutQueue2());
        return rabbitAdmin;
    }
}
发送消息
	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private RabbitTemplate rabbitTemplate;
   
	/**
     * 组装消息
     * @param msg
     * @return
     */
    private static Map<String, Object> createMsg(Object msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        Map<String,Object> message= Maps.newHashMap();
        message.put("sendTime",sdf.format(new Date()));
        message.put("msg", msg);
        message.put("msgId",msgId);
        return message;
    }
	
	@GetMapping("fanout2")
	@ApiOperation("发布订阅模式2")
	public String fanout(@RequestParam String msg){
	    Map<String, Object> map = createMsg(msg);
	    rabbitTemplate.convertAndSend("logs",null,map);
	    return "ok";
	}

消费者代码

	/**
     * 发布订阅模式方法2的消费者1
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
    @RabbitListener(queuesToDeclare = @Queue(value = "fanout-queue1"))
    public void logs1(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();
        log.info("发布订阅模式方法2的消费者1收到:{}",msg);
        //手动回执,不批量签收,回执后才能处理下一批消息
        c.basicAck(tag,false);
    }

    /**
     * 发布订阅模式方法2的消费者2
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
    @RabbitListener(queuesToDeclare = @Queue(value = "fanout-queue2"))
    public void logs2(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();
        log.info("发布订阅模式方法2的消费者2收到:{}",msg);
        //手动回执,不批量签收,回执后才能处理下一批消息
        c.basicAck(tag,false);
    }

路由模式

生产者提供通过direct直流交换机给消费者发送消息,消费者通过关键字匹配规则从绑定的队列获取消息
在这里插入图片描述

在这里插入图片描述

方法1:生产者创建交换机,消费者创建队列与监听队列

生产者代码

创建交换机
	 /**
     * 直流交换机
     * @return
     */
    @Bean
    public Exchange routeExchange(){
        //持久化 非自动删除
        return ExchangeBuilder.directExchange("route").build();
    }
    
	//创建初始化RabbitAdmin对象
    @Bean
    public RabbitAdmin RabbitAdminroute(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.declareExchange(routeExchange());
        return rabbitAdmin;
    }
发送消息
	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private RabbitTemplate rabbitTemplate;
   
	/**
     * 组装消息
     * @param msg
     * @return
     */
    private static Map<String, Object> createMsg(Object msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        Map<String,Object> message= Maps.newHashMap();
        message.put("sendTime",sdf.format(new Date()));
        message.put("msg", msg);
        message.put("msgId",msgId);
        return message;
    }

 	@GetMapping("route")
    @ApiOperation("路由模式1")
    public String route(@RequestParam String msg, @RequestParam String routingKey){
        Map<String, Object> map = createMsg(msg);
        rabbitTemplate.convertAndSend("route",routingKey,map);
        return "ok";
    }
消费者代码
	 /**
     * 路由式方法1的消费者1
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    @RabbitListener(bindings =@QueueBinding(
            value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
            exchange =@Exchange(name="route",declare = "false"),//declare = "false":生产者已定义交换机,此处不再声明交换机
            key = {"error"}//路由键
    ))
    public void route1(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        String routingKey = properties.getReceivedRoutingKey();
        log.info("路由模式方法1的消费者1收到:{},路由键:{}",msg,routingKey);
        //手动回执,不批量签收,回执后才能处理下一批消息
        long tag = properties.getDeliveryTag();
        c.basicAck(tag,false);
    }

    /**
     * 路由式方法1的消费者2
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    @RabbitListener(bindings =@QueueBinding(
            value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
            exchange =@Exchange(name="route",declare = "false"),//declare = "false":生产者已定义交换机,此处不再声明交换机
            key = {"info","debug"}//路由键
    ))
    public void route2(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        String routingKey = properties.getReceivedRoutingKey();
        log.info("路由模式方法1的消费者2收到:{},路由键:{}",msg,routingKey);
        //手动回执,不批量签收,回执后才能处理下一批消息
        long tag = properties.getDeliveryTag();
        c.basicAck(tag,false);
    }

方法2:生产者创建队列与交换机,消费者监听队列

生产者代码

声明队列与交换机
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * 路由交换机
 */
@Configuration
public class RouteExchangeConfiguration {


    /**
     * 定义队列 持久 非排他 非自动删除
     * @return
     */
    @Bean
    public Queue directQueue1(){
        return  QueueBuilder.durable("direct-queue1").build();
    }

    /**
     * 定义队列 持久 非排他 非自动删除
     * @return
     */
    @Bean
    public Queue directQueue2(){
        return  QueueBuilder.durable("direct-queue2").build();
    }

    /**
     * 定义路由交换机 持久  非自动删除
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return  ExchangeBuilder.directExchange("direct").build();
    }

    /**
     * 将队列1与交换机绑定,路由键:error
     * @return
     */
    @Bean
    public Binding directBinding1(){
        return BindingBuilder.bind(directQueue1()).to(directExchange()).with("error");
    }

    /**
     * 将队列2与交换机绑定,路由键:info
     * @return
     */
    @Bean
    public Binding directBinding2(){
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("info");
    }

    /**
     * 将队列2与交换机绑定,路由键:debug
     * @return
     */
    @Bean
    public Binding directBinding3(){
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("debug");
    }

    //创建初始化RabbitAdmin对象
    @Bean
    public RabbitAdmin directRabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        rabbitAdmin.declareExchange(directExchange());
        rabbitAdmin.declareQueue(directQueue1());
        rabbitAdmin.declareQueue(directQueue2());
        return rabbitAdmin;
    }
}
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private RabbitTemplate rabbitTemplate;
   
	/**
     * 组装消息
     * @param msg
     * @return
     */
    private static Map<String, Object> createMsg(Object msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        Map<String,Object> message= Maps.newHashMap();
        message.put("sendTime",sdf.format(new Date()));
        message.put("msg", msg);
        message.put("msgId",msgId);
        return message;
    }
    
	@GetMapping("direct")
    @ApiOperation("路由模式2")
    public String direct(@RequestParam String msg, @RequestParam String routingKey){
        Map<String, Object> map = createMsg(msg);
        rabbitTemplate.convertAndSend("direct",routingKey,map);
        return "ok";
    }

消费者代码

	/**
     * 路由模式方法2的消费者1
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
    @RabbitListener(queuesToDeclare = @Queue(value = "direct-queue1"))
    public void direct1(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        String routingKey = properties.getReceivedRoutingKey();
        log.info("路由模式方法2的消费者1收到:{},路由键:{}",msg,routingKey);
        //手动回执,不批量签收,回执后才能处理下一批消息
        long tag = properties.getDeliveryTag();
        c.basicAck(tag,false);
    }

    /**
     * 路由模式方法2的消费者2
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
    @RabbitListener(queuesToDeclare = @Queue(value = "direct-queue2"))
    public void direct2(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        String routingKey = properties.getReceivedRoutingKey();
        log.info("路由模式方法2的消费者2收到:{},路由键:{}",msg,routingKey);
        //手动回执,不批量签收,回执后才能处理下一批消息
        long tag = properties.getDeliveryTag();
        c.basicAck(tag,false);
    }

主题模式

生产者通过topic主题交换机给消费者发送消息,消费者通过特殊的关键字匹配规则从绑定的队列取消息
在这里插入图片描述
在这里插入图片描述

方法1:生产者创建交换机,消费者创建队列与监听队列

生产者代码

创建交换机
	/**
     * 主题交换机
     * @return
     */
    @Bean
    public Exchange themeExchange(){
        //持久化 非自动删除
        return ExchangeBuilder.topicExchange("theme").build();
    }
    
	//创建初始化RabbitAdmin对象
    @Bean
    public RabbitAdmin rabbitAdmintheme(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        rabbitAdmin.declareExchange(themeExchange());
        return rabbitAdmin;
    }
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private RabbitTemplate rabbitTemplate;
   
	/**
     * 组装消息
     * @param msg
     * @return
     */
    private static Map<String, Object> createMsg(Object msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        Map<String,Object> message= Maps.newHashMap();
        message.put("sendTime",sdf.format(new Date()));
        message.put("msg", msg);
        message.put("msgId",msgId);
        return message;
    }
    
 	@GetMapping("theme")
    @ApiOperation("主题模式1")
    public String theme(@RequestParam String msg, @RequestParam String routingKey){
        Map<String, Object> map = createMsg(msg);
        rabbitTemplate.convertAndSend("theme",routingKey,map);
        return "ok";
    }

消费者代码

	/**
     * 主题模式方法1的消费者1
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    @RabbitListener(bindings =@QueueBinding(
            value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
            exchange =@Exchange(name="theme",type = ExchangeTypes.TOPIC),//declare = "false":生产者已定义交换机,此处不再声明交换机
            key = {"*.error.*"}
    ))
    public void theme1(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        String routingKey = properties.getReceivedRoutingKey();
        log.info("主题模式方法1的消费者1收到:{},路由键:{}",msg,routingKey);
        //手动回执,不批量签收,回执后才能处理下一批消息
        long tag = properties.getDeliveryTag();
        c.basicAck(tag,false);
    }

    /**
     * 主题模式方法1的消费者2
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    @RabbitListener(bindings =@QueueBinding(
            value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
            exchange =@Exchange(name="theme",type = ExchangeTypes.TOPIC),//declare = "false":生产者已定义交换机,此处不再声明交换机
            key = {"#.info","debug.#"}
    ))
    public void theme2(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        String routingKey = properties.getReceivedRoutingKey();
        log.info("主题模式方法1的消费者2收到:{},路由键:{}",msg,routingKey);
        //手动回执,不批量签收,回执后才能处理下一批消息
        long tag = properties.getDeliveryTag();
        c.basicAck(tag,false);
    }

方法2:生产者创建队列与交换机,消费者监听队列

生产者代码

声明队列与交换机
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * 主题交换机
 */
@Configuration
public class TopicExchangeConfiguration {


    /**
     * 定义队列 持久 非排他 非自动删除
     * @return
     */
    @Bean
    public Queue topicQueue1(){
        return  QueueBuilder.durable("topic-queue1").build();
    }

    /**
     * 定义队列 持久 非排他 非自动删除
     * @return
     */
    @Bean
    public Queue topicQueue2(){
        return  QueueBuilder.durable("topic-queue2").build();
    }

    /**
     * 定义路由交换机 持久  非自动删除
     * @return
     */
    @Bean
    public TopicExchange topicExchange(){
        return  ExchangeBuilder.topicExchange("topic").build();
    }

    /**
     * 将队列1与交换机绑定,路由键:A.*
     * @return
     */
    @Bean
    public Binding topicBinding1(){
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("A.*");
    }

    /**
     * 将队列2与交换机绑定,路由键:#.B
     * @return
     */
    @Bean
    public Binding topicBinding2(){
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("#.*");
    }

    /**
     * 将队列2与交换机绑定,路由键:A.B
     * @return
     */
    @Bean
    public Binding topicBinding3(){
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("A.B");
    }

    //创建初始化RabbitAdmin对象
    @Bean
    public RabbitAdmin topicRabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        rabbitAdmin.declareExchange(topicExchange());
        rabbitAdmin.declareQueue(topicQueue1());
        rabbitAdmin.declareQueue(topicQueue2());
        return rabbitAdmin;
    }
}
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private RabbitTemplate rabbitTemplate;
   
	/**
     * 组装消息
     * @param msg
     * @return
     */
    private static Map<String, Object> createMsg(Object msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        Map<String,Object> message= Maps.newHashMap();
        message.put("sendTime",sdf.format(new Date()));
        message.put("msg", msg);
        message.put("msgId",msgId);
        return message;
    }
    
	@GetMapping("topic")
    @ApiOperation("主题模式2")
    public String topic2(@RequestParam String msg, @RequestParam String routingKey){
        Map<String, Object> map = createMsg(msg);
        rabbitTemplate.convertAndSend("topic",routingKey,map);
        return "ok";
    }
消费者代码
	/**
     * 主题模式方法2的消费者1
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
    @RabbitListener(queuesToDeclare = @Queue(value = "topic-queue1"))
    public void topic1(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        String routingKey = properties.getReceivedRoutingKey();
        log.info("主题模式方法2的消费者1收到:{},路由键:{}",msg,routingKey);
        //手动回执,不批量签收,回执后才能处理下一批消息
        long tag = properties.getDeliveryTag();
        c.basicAck(tag,false);
    }

    /**
     * 主题模式方法2的消费者2
     * @param message 消息
     * @param c 通道
     * @param msg 消息内容
     * @throws IOException
     */
    //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
    @RabbitListener(queuesToDeclare = @Queue(value = "topic-queue2"))
    public void topic2(Message message,Channel c,Map msg) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        String routingKey = properties.getReceivedRoutingKey();
        log.info("主题模式方法2的消费者2收到:{},路由键:{}",msg,routingKey);
        //手动回执,不批量签收,回执后才能处理下一批消息
        long tag = properties.getDeliveryTag();
        c.basicAck(tag,false);
    }

异步调用模式

生产者绑定调用队列向消费者发送消息,通过绑定返回队列异步接收消息处理结果

生产者代码

创建发送队列与接收队列

	 /**
     * 定义异步发送队列
     * @return
     */
    @Bean
    public Queue rpcQueue(){
        return QueueBuilder.nonDurable("rpc-queue").build();
    }

    /**
     * 定义异步接收队列
     * @return
     */
    @Bean
    public Queue receivedQueue(){
        return QueueBuilder.nonDurable(UUID.randomUUID().toString().replaceAll("-","")).build();
    }
    
	//创建初始化RabbitAdmin对象
    @Bean
    public RabbitAdmin rabbitAdmin111(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        rabbitAdmin.declareQueue(rpcQueue());
        rabbitAdmin.declareQueue(receivedQueue());
        return rabbitAdmin;
    }

发送与接收消息

	@Value("#{receivedQueue.name}")
    private String rndQueue;

    @GetMapping("rpc")
    @ApiOperation("异步调用模式")
    public String rpc(@RequestParam Integer n){
        rabbitTemplate.convertAndSend("rpc-queue", n, message -> {
                    MessageProperties properties = message.getMessageProperties();
                    properties.setReplyTo(rndQueue);
                    properties.setCorrelationId(UUID.randomUUID().toString());
                    return message;
                }
        );
        return "ok";
    }

    //从随机队列接收计算结果
    @RabbitListener(queues = "#{receivedQueue.name}")
    public void receive(long r, @Header(name= AmqpHeaders.CORRELATION_ID) String correlationId) {
       log.info("\n\n"+correlationId+" - 收到: "+r);
    }

消费者代码

	/**
     * 异步调用
     * @param n
     * @return
     */
    //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
    @RabbitListener(queuesToDeclare = @Queue(value = "rpc-queue",durable = "false"))
    public long getFbnq(int n,Message message,Channel c) throws IOException {
        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();
        long result = f(n);
        c.basicAck(tag,false);
        return result;
    }

    private long f(int n) {
        if (n==1 || n==2) {
            return 1;
        }
        return f(n-1) + f(n-2);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/419454.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity开发数字化看板-通用的设备运动同步

通用的设备运动同步 通过获取实时采集运动位置&#xff0c;发送到unity程序中&#xff0c;通过比例运算&#xff0c;转换成模型的运动位置&#xff0c;实现虚实同步。 在工业设备中&#xff0c;复杂的运动进行分析、分解&#xff0c;最本质的的运动就是平移和转动&#xff0c…

美团到店暑期实习Java一面

目录1.rabbitmq如何避免消息丢失 &#xff08;三个阶段&#xff09;2.如何保证消息的顺序性3.如何保证消息不被重复消费4.缓存与数据库的不一致性5.redis的过期策略和内存淘汰策略6.简单说下LRU的实现7.用原生redis实现分布式锁&#xff0c;锁误删的情况8.锁续期如何去考量9.缓…

MySQL-----库的操作

文章目录前言一、创建数据库二、创建数据库实例三、字符集和校验规则1.查看系统默认字符集以及校验规则.2.查看数据库支持的字符集3.查看数据库支持的字符集校验规则4.校验规则对数据库的影响四、操纵数据库1.查看数据库2.显示创建语句4.修改数据库5.删除数据库6.备份和恢复6.1…

Diffusion模型系列文章

DDPM 论文 扩散模型包括两个过程&#xff1a;前向过程&#xff08;forward process&#xff09;和反向过程&#xff08;reverse process&#xff09;&#xff0c;其中前向过程又称为扩散过程&#xff08;diffusion process&#xff09;&#xff0c;如下图所示&#xff0c;从x…

【音视频第8天】mediasoup拥塞控制【未完待续】

WebRTC的拥塞控制方式主要有以下几个&#xff1a;Transport-cc、BBR-congestion、remb&#xff08;BBR已被google从webrtc移除了&#xff09;。mediasoup支持Transport-cc和remb。 一、前言 实时通信的延时指标 视频服务质量指标 音视频服务质量与带宽之间的矛盾、实时性与服…

【微信小程序】初识微信小程序组件

作者简介&#xff1a;一名C站萌新&#xff0c;前来进行小程序的前进之路博主主页&#xff1a;大熊李子&#x1f43b; 一、组件的创建与引用 1.1 创建组件 在项目的根目录中&#xff0c;鼠标右键&#xff0c;创建 components -> test 文件夹在新建的 components -> test…

NLP / LLMs中的Temperature 是什么?

ChatGPT, GPT-3, GPT-3.5, GPT-4, LLaMA, Bard等大型语言模型的一个重要的超参数 大型语言模型能够根据给定的上下文或提示生成新文本&#xff0c;由于神经网络等深度学习技术的进步&#xff0c;这些模型越来越受欢迎。可用于控制生成语言模型行为的关键参数之一是Temperature …

[译]自下而上认识Elasticsearch

注意:原文发表时间是13年,所以实现有可能与新版不一致. 原文地址:https://www.elastic.co/cn/blog/found-elasticsearch-from-the-bottom-up Introduction 在本系列文章中,我们从一个新的视角来看ElasticSearch.我们将从下往上,从抽象的底层实现到用户可见层,我们在向上移动的…

【JaveEE】网络编程之TCP套接字、UDP套接字

目录 1.网络编程的基本概念 1.1为什么需要网络编程 1.2服务端与用户端 1.3网络编程五元组 1.4套接字的概念 2.UDP套接字编程 2.1UDP套接字的特点 2.2UDP套接字API 2.2.1DatagramSocket类 2.2.2DatagramPacket类 2.2.3基于UDP的回显程序 2.2.4基于UDP的单词查询 …

免疫力低会怎么样 什么情况会导致免疫降低

都说免疫力是很重要的&#xff0c;它会我们健康的第一道防线&#xff0c;但是当免疫力降低的时候&#xff0c;会出现哪些情况&#xff1f;为什么免疫力会降低&#xff1f; 免疫力是人体的防御系统&#xff0c;就像是维持人体正常运转的军队。免疫力的高低&#xff0c;一定程度上…

再探pytorch的Dataset和DataLoader

本文参加新星计划人工智能(Pytorch)赛道&#xff1a;https://bbs.csdn.net/topics/613989052本文从分类、检测、分割三大任务的角度来剖析pytorch得dataset和dataloader源码&#xff0c;可以让初学者深刻理解每个参数的由来和使用&#xff0c;并轻松自定义dataset。思考&#x…

SQL LIMIT

SQL LIMIT SQL LIMIT子句简介 要检索查询返回的行的一部分&#xff0c;请使用LIMIT和OFFSET子句。 以下说明了这些子句的语法&#xff1a; SELECT column_list FROMtable1 ORDER BY column_list LIMIT row_count OFFSET offset;在这个语法中&#xff0c; row_count确定将返…

Html5版贪吃蛇游戏制作(经典玩法)

回味经典小游戏&#xff0c;用Html5做了个贪吃蛇的小游戏&#xff0c;完成了核心经典玩法的功能。 游戏可以通过电脑的键盘“方向键”控制&#xff0c;也可以点击屏幕中的按钮进行控制。&#xff08;支持移动端哈&#xff09; 点击这里试玩 蛇的移动是在18 x 18的格子中进行移…

sqoop数据导入

创建数据库 mysql全表数据导入hdfs mysql查询数据导入hdfs mysql指定列导入hdfs 使用查询条件关键字将mysql数据导入hdfs mysql数据导入hive 创建数据库 hive中创建user表 create table users( id bigint, name string ) row format delimited fields terminated by &…

数据结构 - 归并排序 | C

思路分析 什么是归并&#xff1f; 示例&#xff1a;&#xff08;归并后的结果copy到原数组&#xff09; 逻辑&#xff1a; if (a[begin1] < a[begin2]) {tmp[i] a[begin1];} else {tmp[i] a[begin2];} 归并排序 分解到“有序”再归并 递归 int middle (left righ…

哈希——unordered系列关联式容器

目录 unordered系列关联式容器 概念 unordered_map 无序去重 operator[] unordered_set 无序去重 OJ练习题 重复n次的元素 两个数组的交集 两个数的交集二 底层结构 概念 哈希冲突 闭散列 结点的定义 扩容 字符串取模 插入 查找 删除 闭散列完整代码 开…

安卓远程控制软件哪个好用

如果您曾希望将个人电脑放在口袋里&#xff0c;那么您可能只需要安卓远程访问软件。 没有远程访问应用程序&#xff1a;使用和控制计算机的唯一方法是坐在计算机前并手动输入命令。 使用远程访问应用程序&#xff1a;您可以在世界任何地方通过 Internet 连接从您的安卓平板电…

【30天python从零到一】---第七天:列表和元组

&#x1f34e; 博客主页&#xff1a;&#x1f319;披星戴月的贾维斯 &#x1f34e; 欢迎关注&#xff1a;&#x1f44d;点赞&#x1f343;收藏&#x1f525;留言 &#x1f347;系列专栏&#xff1a;&#x1f319; Python专栏 &#x1f319;请不要相信胜利就像山坡上的蒲公英一样…

计算机组成原理---第五章中央处理器

&#xff08;一&#xff09;CPU 的功能和组成 CPU 的功能 Ⅰ 概述&#xff1a;当程序指令装入内存储器后&#xff0c;CPU 用来自动完成取指令和执行指令的任务。 Ⅱ CPU 的功能&#xff1a;①指令控制 ②操作控制 ③时间控制 ④数据加工 2.CPU 的基本组成 CPU 的基本部分为运…

【论文阅读】[JBHI] VLTENet、[ISBI]

[JBHI] VLTENet 论文连接&#xff1a;VLTENet: A Deep-Learning-Based Vertebra Localization and Tilt Estimation Network for Automatic Cobb Angle Estimation | IEEE Journals & Magazine | IEEE Xplore Published in: IEEE Journal of Biomedical and Health Infor…