【谷粒商城之消息队列RabbitMQ】

news2025/1/15 17:24:06

本笔记内容为尚硅谷谷粒商城消息队列RabbitMQ部分

目录

一、概述

二、简介

三、Docker安装RabbitMQ

四、Springboot整合RabbitMQ 

1、引入spring-boot-starter-amqp

2、application.yml配置

3、测试RabbitMQ

1. AmqpAdmin-管理组件

2.RabbitTemplate-消息发送处理组件

3.RabbitListener&RabbitHandle接收消息

4.RabbitMQ消息确认机制-可靠抵达

5.可靠抵达-Ack消息确认机制


一、概述


二、简介


三、Docker安装RabbitMQ


docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p
25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

  • 4369, 25672 (Erlang发现&集群端口)
  • 5672, 5671 (AMQP端口)
  • 15672 (web管理后台端口)
  • 61613, 61614 (STOMP协议端口)
  • 1883, 8883 (MQTT协议端口) 

Networking and RabbitMQ — RabbitMQ

四、Springboot整合RabbitMQ 


1、引入spring-boot-starter-amqp

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、application.yml配置

spring.rabbitmq.host=192.168.88.130
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

3、测试RabbitMQ

1. AmqpAdmin-管理组件

    @Autowired
    AmqpAdmin amqpAdmin;

    /**
     * 1、如何创建Exchange[hello.java.exchange]、Queue、Binding
     *      1)、使用AmqpAdmin进行创建
     */
    // 创建交换机
    @Test
    void createExchange() {
        //DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
        //durable:是否持久化
        DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        log.info("exchange:[{}]创建成功","hello-java-exchange");
    }

    // 创建队列
    @Test
    void createQueue() {
        //exclusive:是否排他的,false:允许同时有多个连接到此queue
        Queue queue = new Queue("hello-java-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("queue:[{}]创建成功","hello-java-queue");
    }

    @Test
    void createBinding(){
        // 创建绑定
        // String destination【目的地】,
        // DestinationType destinationType 【目的地类型】
        // String exchange【交换机】,
        // String routingKey【路由键】,
        // Map<String, Object> arguments【自定义参数】
        // 将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定的路由键
        Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
                "hello-java-exchange",
                "hello.java",null);
        amqpAdmin.declareBinding(binding);
        log.info("binding:[{}]创建成功","hello-java-binding");
    }

2.RabbitTemplate-消息发送处理组件

    @Test
    void sendMessageTests() {

        //1、发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去。对象必须实现Serializable
        String msg = "Hello World";
        OrderReturnReasonEntity entity = new OrderReturnReasonEntity();
        entity.setId(1L);
        entity.setCreateTime(new Date());

        //2、发送的对象类型的消息,可以是一个json
        for (int i = 0;i<10;i++) {
            if(i%2==0) {
                entity.setName("Vc" + i);
                rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", entity);
            }else {
                OrderEntity orderEntity = new OrderEntity();
                orderEntity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
            }
            log.info("消息发送完成{}" + entity);
        }
    }

使用json格式的序列化器

    //使用json格式的序列化器
    //否则使用jdk的序列化器
    @Configuration
public class MyRabbitConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

3.RabbitListener&RabbitHandle接收消息

    1)必须使用@EnableRabbit
    2)监听方法必须放在@Component中
    3)@RabbitListener(queues={"hello-java-queue"})放在类上
         @RabbitHandler:标在方法上【作用:重载处理不同类型的数据】
@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService 
    /**
     * queue:声明需要监听的所有队列
     *
     * org.springframework.amqp.core.Message
     *
     * 参考可以写一下类型
     * 1、Message message:原生消息详细信息。头+体
     * 2、T<发送的消息的类型> OrderReturnReasonEntity content
     * 3、Channel channel: 当前传输数据的通道
     *
     * Queue: 可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
     * 场景:
     *      1)、订单服务启动多个;同一个消息,只能有一个客户端收到
     *      2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息
     * @param message
     */
    //RabbitListener(queues = {"hello-java-queue"})
    @RabbitHandler
    public void recieveMessage(Message message,
                               OrderReturnReasonEntity content,
                               Channel channel) throws InterruptedException {
        //{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1652100907404}
        System.out.println("接收到消息..."+content);
        byte[] body = message.getBody();
        //消息头属性信息
        MessageProperties properties = message.getMessageProperties();
        Thread.sleep(3000);
        System.out.println("消息处理完成=》"+content.getName());
    }

    @RabbitHandler
    public void recieveMessage2(OrderEntity content) {
        //{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1652100907404}
        System.out.println("接收到消息..."+content);
    }

}

4.RabbitMQ消息确认机制-可靠抵达

注意:springboot.rabbitmq.publisher-confirm 已被弃用.

#新版使用
#开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated

#开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列,以异步发送优先回调我们这个return
spring.rabbitmq.template.mandatory=true

示例代码: 

@Configuration
public class MyRabbitConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *
     */
    @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {

        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }
}

 

5.可靠抵达-Ack消息确认机制

#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

示例代码: 

    @RabbitHandler
    public void recieveMessage(Message message,
                               OrderReturnReasonEntity content,
                               Channel channel) throws InterruptedException {
        //{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1652100907404}
        System.out.println("接收到消息..."+content);
        byte[] body = message.getBody();
        //消息头属性信息
        MessageProperties properties = message.getMessageProperties();
        Thread.sleep(3000);
        System.out.println("消息处理完成=》"+content.getName());
        //Channel内按顺序自增
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        //签收货物,非批量模式
        try {

            if(deliveryTag%2==0){
                //收货
                channel.basicAck(deliveryTag,false);
                System.out.println("签收了货物..."+deliveryTag);
            }else {
                //退货 requeue=false 丢弃 requeue=true 发回服务器,服务器重新入队
                //deliveryTag, multiple, requeue(false不再入队)
                channel.basicNack(deliveryTag,false,false);
                System.out.println("没有签收了货物..."+deliveryTag);
            }
        } catch (Exception e) {
            //网络中断

        }
    }

结束!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/491532.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Wikidata实操

1. Wikidata 简介 Wikidata 即维基数据&#xff0c;是维基百科的一个项目。个项目已经在维基百科德国分部开始进行&#xff0c;项目完成之后&#xff0c;将会交给维基百科基金会进行操作和维护。&#xff08;具体百度即可&#xff0c;不多赘述&#xff09; 官网&#xff1a;htt…

操作系统考试复习—第三章 优先级倒置 死锁问题

当前OS广泛采用优先级调度算法和抢占方式&#xff0c;然而在系统中存在着影响进程运行的资源从而可能产生"优先级倒置"现象 具体解释为&#xff1a;在原本的调度算法设计中&#xff0c;高优先级进程可以抢占低优先级的CPU资源&#xff0c;先执行高优先级任务。但是存…

【STM32】在使用STM32Cube.IDE时更改时钟频率后代码跳进异常中断

目录 1、前言2、问题与复现办法3、解决的问题的过程 1、前言 这是在项目中无意发现的问题&#xff0c;其实有同样更复杂的工程可以运行&#xff0c;但是后来发现新建一个简单工程反而运行不了了&#xff0c;但是同样更复杂的工程可以运行说明本来同事原来已经不知道在哪里找到…

Vmware安装Kali

需要准备两个东西&#xff0c;kali镜像和VMware软件 下载kali iso 下载界面有三个可选择的 install是安装版&#xff0c;安装使用&#xff1b; Live版可以直接启动运行&#xff1b; netinstaller是网络安装&#xff0c;需要从网络上下载&#xff0c;文件本身只有引导作用&…

Idea Jrebel 报错:Cannot reactivate, offline seat in use ...

Idea Jrebel 报错&#xff1a;Cannot reactivate, offline seat in use ... 一、问题描述 在使用idea Jrebel续期的时候&#xff0c;修改idea激活服务器地址时&#xff0c;遇到报错&#xff1a;Cannot reactivate, offline seat in use. Click Work online in JRebel configura…

基于aspnet个人博客网站dzkf6606程序

系统使用Visual studio.net2010作为系统开发环境&#xff0c;并采用ASP.NET技术&#xff0c;使用C#语言&#xff0c;以SQL Server为后台数据库。 1&#xff0e;系统登录&#xff1a;系统登录是用户访问系统的路口&#xff0c;设计了系统登录界面&#xff0c;包括用户名、密码和…

探索卡尔曼滤波在位姿估计中的魅力:无人机与自动驾驶的关键技术揭秘

摘要&#xff1a;在本博客中&#xff0c;我们将探讨卡尔曼滤波在位姿估计领域的应用&#xff0c;特别是在无人机和自动驾驶场景中的重要性。我们将详细介绍卡尔曼滤波的原理、优势及其在无人机、自动驾驶等实际案例中的应用。此外&#xff0c;我们还将关注卡尔曼滤波在其他领域…

【服务器数据恢复】同友存储上的虚拟机数据恢复案例

服务器数据恢复环境&#xff1a; 同友存储&#xff0c;底层由数块物理硬盘组建的raid5磁盘阵列&#xff0c;存储池划分若干lun&#xff0c;每个lun下有数台虚拟机。 服务器故障&#xff1a; 未知原因导致存储崩溃&#xff0c;无法启动&#xff0c;虚拟机全部丢失&#xff0c;其…

linux中基础开发工具的使用

1.linux中的软件包管理器 1.1什么是软件包 在Linux下安装软件, 一个通常的办法是下载到程序的源代码, 并进行编译, 得到可执行程序.但是这样太麻烦了, 于是有些人把一些常用的软件提前编译好, 做成软件包(可以理解成windows上的安装程序)放在一个服务器上, 通过包管理器可以很…

软件管理员密码的作用 如何设置软件管理员密码?

在使用夏冰加密软件的过程中&#xff0c;很多软件都是可以设置软件管理员密码的。那么你知道管理员密码有什么用吗&#xff1f;又该如何设置软件管理员密码呢&#xff1f;下面我们来了解一下吧。 软件管理员密码是什么意思&#xff1f; 软件管理员密码就是软件的密码&#xff…

毕业5年,技术越来越好,混的却越来越差...

别人都是越来越好&#xff0c;而我是越来越差&#xff01; 17年&#xff0c;从一个普通的本科毕业&#xff0c;那个时候的我&#xff0c;很迷茫&#xff0c;简历上的求职岗位都不知道写什么&#xff0c;因为家里是农村的&#xff0c;朴实的父母也帮不上什么忙&#xff0c;关于…

KDBR-IV变压器空负载短路损耗测试仪

一、产品概述 本产品是我公司针对不良电力用户偷逃基本电费、私自增容问题而研发设计的仪器&#xff0c;用于变压器容量、空载、负载等特性参数测量的高精密仪器。本仪器为多功能测量仪器&#xff0c;相当于往常两种测试仪器&#xff1a;即变压器容量测试仪变压器特性参数测试仪…

【因子挖掘】遗传规划概述

在多因子选股的框架下&#xff0c;因子的产生通常有两条途径&#xff1a; 先有逻辑&#xff0c;后有公式&#xff1a;根据经济学逻辑、历史经验、直觉进行人工构造一些因子&#xff1b; 例如&#xff1a;动量&#xff08;Momentum&#xff09;因子&#xff1a;当最近的股价呈现…

Cadence Allegro 布局操作Move命令的应用

在布局的时候&#xff0c;常常需要对一些元素去进行移动位置以方便进行设计。 1、执行菜单命令Edit-Move&#xff0c;此时PCB界面的左下角会显示Move&#xff0c;就表示正在执行移动命令&#xff0c;如图1所示。 图1 移动命令 2、在PCB界面右边的Find面板中所选择需要进行移动…

Charles抓包工具使用

一、Charles的安装与激活 安装 官方地址&#xff1a;https://www.charlesproxy.com/ 根据自己系统安装最新版本即可 安装后可直接打开使用 激活 打开Charles -> 【Help】 -> 【Register Charles】 -> 输入 Registered Name &#xff1a; https://zhile.io Lic…

智能座舱的“宏大蓝图”和“残酷现实”

配图来自Canva可画 2023年上海车展各大车企发布新车、新配置和新战略好不热闹&#xff0c;“智能驾驶”、“智能座舱”等关键词频频出现&#xff0c;智能化已然成为车企技术比拼的关键。 Unity中国发布最新智能座舱解决方案&#xff0c;可为车企提供成熟、可量产落地的HMI&…

学系统集成项目管理工程师(中项)系列17b_范围管理(下)

1. 创建工作分解结构WBS 1.1. 自上而下的分解结构 1.2. 把项目可交付成果和项目工作分解成较小的、更易于管理的组件的过程 1.3. 用来确定项目范围的 1.3.1. 包括分包出去的工作 1.3.1.1. 【21上选40】 1.4. 输入 1.4.1. 项目范围管理计划 1.4.2. 项目范围说明书 1.4.…

AI教父变成“吹哨人” 他到底在警觉什么?

“我现在对自己过去的工作感到后悔&#xff0c;我找借口来安慰自己&#xff1a;就算我没做&#xff0c;别人也会做的。”有AI“教父”之称的杰弗里辛顿 (Geoffrey Hinton)在接受媒体采访时透露出悔意。 作为AI深度学习领域的代表性人物&#xff0c;辛顿一生都在该领域深耕&…

随笔-听说你年入百万了

两个月前接到老代的电话&#xff0c;说4月30号结婚&#xff0c;预约一下时间。半个月前接到小付的电话&#xff0c;说5月1号结婚&#xff0c;行吧&#xff0c;值当回趟老家了。 抢票还算顺利&#xff0c;转了一趟车&#xff0c;29号下午到了老家&#xff0c;想着收拾一下&…

集成ES全文检索、Neo4J知识图谱、Activiti工作流的知识库管理系统

一、项目介绍 一款全源码&#xff0c;可二开&#xff0c;可基于云部署、私有部署的企业级知识库云平台&#xff0c;一款让企业知识变为实打实的数字财富的系统&#xff0c;应用在需要进行文档整理、分类、归集、检索、分析的场景。 获取方式q:262086839 为什么建立知识库平台&…