RabbitMQ: return机制

news2024/10/5 23:26:25

1. Return机制

Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

而且exchange是不能持久化消息的,queue是可以持久化消息。

采用Return机制来监听消息是否从exchange送到了指定的queue中

 2.Java的实现方式

1.导入依赖

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>

2.生产者的实现方式

 采用Return机制来监听消息是否从exchange送到了指定的queue中

package com.qf.mq2302.hello;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;

import java.io.IOException;

public class SendRetrun {
    public static final String QUEUE_NAME="hello-queue";

    public static void main(String[] args) throws Exception {

        //1.获取连接对象
        Connection conn = MQUtils.getConnection();

        //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
        Channel channel = conn.createChannel();
        
        //3.声明了一个队列
        /**
         * queue – the name of the queue
         * durable – true代表创建的队列是持久化的(当mq重启后,该队列依然存在)
         * exclusive – 该队列是不是排他的 (该对立是否只能由当前创建该队列的连接使用)
         * autoDelete – 该队列是否可以被mq服务器自动删除
         * arguments – 队列的其他参数,可以为null
         */
     channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //开启 return 机制
        //编写回调方法
        channel.addReturnListener(new ReturnListener() {
            //如果消息没有成功发送到队列,这个方法会被调用
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("====================ReturnListener==================");
                System.out.println("replyCode:"+replyCode);
                System.out.println("replyText:"+replyText);
                System.out.println("exchange:"+exchange);
                System.out.println("routingKey:"+routingKey);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
                System.out.println("====================ReturnListener==================");
            }
        });


        String message = "Hello doubleasdasda!";

        //生产者如何发送消息,使用下面的方法即可
        /**
         * exchange – 交换机的名字 ,如果是空串,说明是把消息发给了默认交换机
         * routingKey – 路由的key,当发送消息给默认交换机时,routingkey代表队列的名字
         * other properties - 消息的其他属性,可以为null
         * body – 消息的内容,注意,要是有 字节数组
         */
        //注意:如果要使用生产者的return机制,需要在发送消息时,指定mandatory(强制性)为true
        channel.basicPublish("", "sadnaas", true,null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");



    Thread.sleep(1000);

     //   关闭资源
        channel.close();
        conn.close();
    }
}

这个必须要加上才能让rutern返回机制生效 

 

 3.消费者的实现方式

package com.qf.mq2302.hello;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;

public class Recv {
   private  final  static  String QUEUE_NAME="hello-queue";

    public static void main(String[] args) throws Exception {
        //1.获取连接对象
        Connection conn = MQUtils.getConnection();

        //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
        Channel channel = conn.createChannel();

        /**
         * 第一个参数队列名称
         * 第二个参数,耐用性
         * 第三个参数排外性
         * 第四个参数是否自动删除
         * 第五个参数,可以定义什么类型的队列
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中
        DeliverCallback deliverCallback =new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                System.out.println(consumerTag);
            //从Delivery对象中可以获取到生产者,发送的消息的字节数组
                byte[] body = message.getBody();
                String msg = new String(body, "utf-8");

                //在这里写消费者的业务逻辑,例如,发送邮件
                System.out.println(msg);

            }
        };

        //4.让当前消费者开始消费(QUEUE_NAME)队列中的消息
        /**
         * queue – the name of the queue
         * autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。
         * deliverCallback – 当有消息发送给该消费者时,消费者如何处理消息的逻辑
         * cancelCallback – 当消费者被取消掉时,如果要执行代码,写到这里
         */
      channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});





    }





}

3.整合springboot实现

1.导入依赖


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.yml配置文件

spring:
  rabbitmq:
    host: 8.140.244.227
    port: 6786
    username: test
    password: test
    virtual-host: /test
    publisher-returns: true #开启return机制

3.RabbitMQ配置文件

package com.qf.bootmq2302.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {


    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();

        //设置连接工厂对象
        rabbitTemplate.setConnectionFactory(cachingConnectionFactory);

       // 开启return机制
        rabbitTemplate.setMandatory(true);

         rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
             @Override
             public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                 System.out.println("message:"+new String(message.getBody()));
                 System.out.println("replyCode:"+replyCode);
                 System.out.println("replyText:"+replyText);
                 System.out.println("exchange:"+exchange);
                 System.out.println("routingKey:"+routingKey);
             }
         });

            return rabbitTemplate;
    }



}

4.生产者的controller

    @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("/test1")
    public String test1(String msg,String routkey){
        System.out.println(msg);
        String exchangeName = "";//默认交换机
        String routingkey = routkey;//队列名字

        //生产者发送消息
        rabbitTemplate.convertAndSend(exchangeName,routingkey,msg);
        return "ok";
    }

5.消费者写一个队列

   @RabbitListener(queues = "queueA")
    public void getMsg1(Map<String,Object> data, Channel channel,Message message) throws IOException {


        System.out.println(data);

        //手动ack//若开启手动ack,不给手动ack,就按照 prefetch: 1 #等价于basicQos(1)的量,就这么多,不会多给你了,因为你没有确认。确认一条,就给你一条
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }

6.消费者的配置文件

spring:
  rabbitmq:
    host: 8.140.244.227
    port: 6786
    username: test
    password: test
    virtual-host: /test
    #手动ACK
    listener:
      simple:
        acknowledge-mode: manual  # 手动ack
        prefetch: 1 #等价于basicQos(1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/989677.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于googlenet网络的动物种类识别算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 ................................................................. % 获取输入层的尺寸 Inp…

异步编程 - 13 高性能线程间消息传递库 Disruptor

文章目录 Disruptor概述Disruptor中的核心术语Disruptor 流程图 Disruptor的特性详解基于Disruptor实现异步编程 Disruptor概述 Disruptor是一个高性能的线程间消息传递库&#xff0c;它源于LMAX对并发性、性能和非阻塞算法的研究&#xff0c;如今构成了其Exchange基础架构的核…

NIFI使用InvokeHTTP发送http请求

说明 这里介绍四种平时常用的http请求方法&#xff1a;GET、POST、PUT、DELETE。 在官方的介绍文档中关于InvokeHTTP处理器的描述是这么说的&#xff1a; An HTTP client processor which can interact with a configurable HTTP Endpoint. The destination URL and HTTP Met…

java 企业工程管理系统软件源码 自主研发 工程行业适用

工程项目管理软件&#xff08;工程项目管理系统&#xff09;对建设工程项目管理组织建设、项目策划决策、规划设计、施工建设到竣工交付、总结评估、运维运营&#xff0c;全过程、全方位的对项目进行综合管理 工程项目各模块及其功能点清单 一、系统管理 1、数据字典&am…

网工内推 | 云架构运维、网络工程师,base北京,最高20k

01 协合新能源 招聘岗位&#xff1a;IT运维工程师 职责描述&#xff1a; 1、对集团内使用云计算架构&#xff08;Kubernetes&#xff09;的系统进行规划、运维及管理相关工作。 2、对集团数据中心系统的大数据基础架构&#xff08;Cloudera Distribution Hadoop&#xff09;的…

【办公类-16-06】20230901大班运动场地分配表-斜线排列、5天循环、不跳节日,手动修改节日”(python 排班表系列)

背景需求&#xff1a; 大班组长发来一个“运动排班”的需求表&#xff1a;“就是和去年一样的每个班的运动排班&#xff0c;就因为今年大班变成7个班&#xff0c;要重新做一份&#xff0c;不然我就用去年的那份了&#xff08;8个大班排班&#xff09;” &#xff08;拆了中8班…

STM32定时器的One Pulse Mode,OPM应用

文章目录 OPM应用1-精准延时应用2-精准定时 OPM T IMx_CR1的OPM位 位 3 OPM&#xff1a;单脉冲模式 (One-pulse mode) 0&#xff1a;计数器在发生更新事件时不会停止计数 1&#xff1a;计数器在发生下一更新事件时停止计数&#xff08;将 CEN 位清零&#xff09; 应用1-精准延时…

光学显微镜算法(OMA)(含MATLAB代码)

先做一个声明&#xff1a;文章是由我的个人公众号中的推送直接复制粘贴而来&#xff0c;因此对智能优化算法感兴趣的朋友&#xff0c;可关注我的个人公众号&#xff1a;启发式算法讨论。我会不定期在公众号里分享不同的智能优化算法&#xff0c;经典的&#xff0c;或者是近几年…

期权开户必读:费用、保证金和稳定性安全性必须兼备

期权开户的核心是判断50ETF方向&#xff0c;上涨下跌都能赚钱&#xff0c;其次选择0门槛期权平台要考量期权手续费和安全性是第一位&#xff0c;下文为大家科普期权开户的核心&#xff1a;费用、保证金和稳定性安全性必须兼备的知识点。本文来自 &#xff1a;期权酱 一、期权开…

如何把Android Framework学彻底?一条龙学习

Framework通俗易懂 平时学习 Android 开发的第一步就是去学习各种各样的 API&#xff0c;如 Activity&#xff0c;Service&#xff0c;Notification 等。其实这些都是 Framework 提供给我们的。Framework 层为开发应用程序提供了非常多的API&#xff0c;我们通过调用这些 API …

自然语言处理——数据清洗

一、什么是数据清洗 数据清洗是指发现并纠正数据文件中可识别的错误的最后一道程序&#xff0c;包括检查数据一致性&#xff0c;处理无效值和缺失值等。与问卷审核不同&#xff0c;录入后的数据清理一般是由计算机而不是人工完成。 ——百度百科 二、为什么要数据清洗 现实生…

Apipost:你API管理中的得力助手

API管理的难点在哪&#xff1f; 相信无论是前端&#xff0c;还是后端的测试和开发人员&#xff0c;都遇到过这样的困难。不同工具之间数据一致性非常困难、低效。多个系统之间数据不一致&#xff0c;导致协作低效、频繁出问题&#xff0c;开发测试人员痛苦不堪。 开发人员在 …

STM32F4X RTC

STM32F4X RTC 什么是RTCSTM32F4X RTCSTM32F4X RTC框图STM32F4X RTC计数频率STM32F4X RTC日历STM32F4X RTC闹钟 STM32F4X RTC例程 什么是RTC RTC全程叫Real-Time Clock实时时钟&#xff0c;是MCU中一个用来计时的模块。RTC的一个主要作用是用来显示实时时间&#xff0c;就像日常…

Visual Studio 2019下使用C++与Python进行混合编程——环境配置与C++调用Python API接口

前言 在vs2019下使用C与Python进行混合编程,在根源上讲&#xff0c;Python 本身就是一个C库&#xff0c;那么这里使用其中最简单的一种方法是把Python的C API来嵌入C项目中&#xff0c;来实现混合编程。当前的环境是&#xff0c;win10,IDE是vs2019,python版本是3.9&#xff0c…

一个帮各位填秋招表格省一点事的浏览器插件

最近应该很多和我一样的双非鼠鼠在秋招等面试&#xff0c;而且处于海投阶段&#xff0c;为了不忘记投了哪些公司&#xff0c;可以用这样一个表格来记录&#xff1a; 其中有些字段&#xff0c;比如状态、投递时间、查看进度的网址其实可以不手动输入&#xff0c;所以搞个插件来…

2023数模国赛C 题 蔬菜类商品的自动定价与补货决策-完整版创新多思路详解(含代码)

题目简评&#xff1a;看下来C题是三道题目里简单一些的&#xff0c;考察的点比较综合&#xff0c;偏数据分析。涉及预测模型和运筹优化(线性规划)&#xff0c;还设了一问开放型问题&#xff0c;适合新手入门&#xff0c;发挥空间大。 题目分析与思路&#xff1a; 背景&#x…

部署zookeeper集群

zookeeper和jdk下载地址 jdk 链接&#xff1a;https://pan.baidu.com/s/13GpNaAiHM5HSDJ66ebBtEg 提取码&#xff1a;90se zookeeper 链接&#xff1a;https://pan.baidu.com/s/1nSFKEhSGNiwgSPZWdb7hkw 提取码&#xff1a;u5l2 在所有的机器上面执行下面步骤&#xff1a; 1.上…

C++的纯虚函数和抽象类

在C++中,可以将虚函数声明为纯虚函数,语法格式为: virtual 返回值类型 函数名 (函数参数) = 0; 纯虚函数没有函数体,只有函数声明,在虚函数声明的结尾加上=0,表明此函数为纯虚函数。 最后的=0并不表示函数返回值为0,它只起形式上的作用,告诉编译系统“这是纯虚函数”。…

继承的偏移量问题

下面是实际测试&#xff1a; p1 p3 ! p2 Base1* p1 &d; Derive* p3 &d;! Base2* p2 &d; 图解&#xff1a;

斯坦福小镇升级版——AI-Town搭建指南

导语&#xff1a; 8月份斯坦福AI小镇开源之后&#xff0c;引起了 AIGC 领域的强烈反响&#xff0c;但8月份还有另一个同样非常有意义的 AI-Agent 的项目开源&#xff0c;a16z主导的 AI-Town 本篇文章主要讲解如何搭建该项目&#xff0c;如有英文基础或者对这套技术栈熟悉&#…