RabbitMQ工作模式2 整合springboot 和MQ高级特性

news2024/11/26 2:50:17

RabbitMQ工作模式

1.路由模式

创建交换机 , 连接队列 (生产者)

public class MyTestExDirect {
    @Test
    public void bbb() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接mq
        connectionFactory.setUsername("账号");
        connectionFactory.setPassword("密码");
        connectionFactory.setHost("ip地址");
        connectionFactory.setPort(端口号);
        connectionFactory.setVirtualHost("/aaa");
        //建立连接
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //创建交换机
        channel.exchangeDeclare("ex_direct", BuiltinExchangeType.DIRECT,false);
        //创建队列
        /**
         * String queue, 队列的名称
         * boolean durable, 持久化
         * boolean exclusive, 是否独占
         * boolean autoDelete,  受否自动删除
         * Map<String, Object> arguments  参数
         */
        channel.queueDeclare("mydirect1",false,false,false,null);
        channel.queueDeclare("mydirect2",false,false,false,null);
        //绑定交换机和队列   设置routingkey
        channel.queueBind("mydirect1","ex_direct","error");
        channel.queueBind("mydirect2","ex_direct","test");
        channel.queueBind("mydirect2","ex_direct","test2");
        //交换机     routingkey     根据routingkey在队列上发布消息
        channel.basicPublish("ex_direct","error",null,"路由模式测试".getBytes());
    }
}

启动测试

交换机创建成功

队列创建成功 , 与交换机连接成功

通过routingkey "error" 将消息发送到 mydirect1

创建消费者

public class ConsumerAppDirect
{
    public static void main( String[] args ) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接mq
        connectionFactory.setUsername("账号"); 
        connectionFactory.setPassword("密码"); 
        connectionFactory.setHost("ip地址");
        connectionFactory.setPort(端口号); 
        connectionFactory.setVirtualHost("/aaa");
        //建立连接
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("mq:-----aaa"+s);
            }
        };
        channel.basicConsume("mydirect1",true,consumer);
    }
}

开启监控

2.Topics 主题模式

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词 

*:匹配不多不少恰好1个词   test.* test.insert

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert 

创建交换机和生产者

public class MyTestExTopics {
    @Test
    public void ccc() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接mq
        connectionFactory.setUsername("账号");
        connectionFactory.setPassword("密码"); 
        connectionFactory.setHost("ip地址");
        connectionFactory.setPort(端口号);
        connectionFactory.setVirtualHost("/aaa");
        //建立连接
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //创建交换机
        channel.exchangeDeclare("ex_topics", BuiltinExchangeType.TOPIC,false);
        //创建队列
        /**
         * String queue, 队列的名称
         * boolean durable, 持久化
         * boolean exclusive, 是否独占
         * boolean autoDelete,  受否自动删除
         * Map<String, Object> arguments  参数
         */
        channel.queueDeclare("mytopics1",false,false,false,null);
        channel.queueDeclare("mytopics2",false,false,false,null);
        //绑定交换机和队列   设置routingkey
        channel.queueBind("mytopics1","ex_topics","test.#");
        channel.queueBind("mytopics2","ex_topics","*.aaa");
        channel.queueBind("mytopics2","ex_topics","test.*");
        //交换机     此处的routingkey应该是具体的值     根据routingkey在队列上发布消息
        channel.basicPublish("ex_topics","test.aaa",null,"TOPIC模式测试".getBytes());


    }
}

测试

发布消息成功

消费者监听参考路由模式 , 只需要修改队列就行

SpringBoot整合RabbitMQ

1.搭建项目

添加依赖

<!--2. rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

添加配置文件

2.创建工作模式(主题模式)

1)创建交换机和队列

package com.example.config;


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class TopicMqConfig {
    @Value("${mq.exchange.name}")
    private String EXCHANGENAME;
    @Value("${mq.queue.name}")
    private String QUEUENAME1;
    @Value("${mq.queue.name}")
    private String QUEUENAME2;
    //创建交换机
    @Bean("ex1")
    public Exchange getExchange(){
        Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();
        return exchange;
    }
    //创建队列
    @Bean("queue1")
    public Queue getQueue1(){
        Queue queue1 = QueueBuilder.nonDurable(QUEUENAME1).build();
        return queue1;
    }
    @Bean("queue2")
    public Queue getQueue2(){
        Queue queue2 = QueueBuilder.nonDurable(QUEUENAME1).build();
        return queue2;
    }
    //绑定交换机和队列
    @Bean("binding1")
    public Binding bindingQueueToExchange1(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){
        Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("*.*").noargs();
        return binding1;
    }
    @Bean("binding2")
    public Binding bindingQueueToExchange2(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){
        Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("test.*").noargs();
        return binding2;
    }
}

2)创建生产者

测试

3)创建消费者

创建配置文件

创建测试类 监听队列
package com.example.message;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerMessage {
    @RabbitListener(queues = "test_queue2")
    public void xxx(Message message){
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
    }
}

测试

MQ高级特性,消息的可靠性传递

1.确认模式

开启确认模式 修改配置

创建测试类

@SpringBootTest
public class MqTtst {
    @Value("${mq.exchange.name}")
    private String EXCHANGENAME;
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Test
    void sendMsg(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if (b){
                    System.out.println("发送消息成功");
                }else {
                    System.out.println("发送消息失败,原因:"+s);
                }
            }
        });
        rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");
    }
}

启动测试

2.消息回退

当交换机接收到消息 , 但队列收不到消息时 , 使用回退

修改配置

测试

@Test
void sendMsgReturn(){
    //  消息回退
    rabbitTemplate.setMandatory(true);//
    rabbitTemplate.setReturnsCallback(returnedMessage -> System.out.println("消息回退,回退的消息是:"+new String(returnedMessage.getMessage().getBody())));
    rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");
}

3.Consumer Ack

三种确认方式

 自动确认:acknowledge="none" 。不管处理成功与否,业务处理异常也不管

(当消费者意担接收到消息之后,消费者就会给broker一个回执,证明已经接收到消息 了,不管消息到底是否成功)

手动确认:acknowledge="manual" 。可以解决业务异常的情况

(收到消息之后不会立马确认收到消息,当业务处理没有问题的时候手动的调用代码的方 式来进行处理,如果业务失败了,就可以进行额外的操作)

根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

1)自动确认

2)手动确认

修改配置 开启手动签收

3)创建测试

@Component
public class ShouDingQianShouMeaasge implements ChannelAwareMessageListener {
    @Override
    @RabbitListener(queues = "test_queue2")
    public void onMessage(Message message, Channel channel) throws Exception {
      Thread.sleep(2000);
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(1/0);
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
            System.out.println("拒绝签收");
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

启动测试

有异常拒绝签收

无异常签收成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1257783.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue3中调用外部iframe链接方法

业务场景&#xff0c;点击某个按钮需要跳转到外部iframe的地址&#xff0c;但是需要在本项目内显示。以前项目中写过调用外部链接的功能&#xff0c;是有菜单的&#xff0c;但是这次是按钮&#xff0c;所以不能直接把地址配到菜单里。 实现方法&#xff1a;在本地路由文件里写个…

neo4j在Linux上安装及使用

1、简介 neo4j安装主要有两个步骤&#xff1a; 环境配置&#xff1a;Java安装工具下载&#xff1a;neo4j安装 2、java 安装 2.1 检查 安装前可以检查下&#xff0c;当前环境是否有Java 查看是否安装&#xff1a;java -version 说明当前环境没有&#xff0c;那么去下载 …

推荐你一个基于Koin, Ktor Paging等组件的KMM Compose Multiplatform项目

推荐你一个基于Koin, Ktor & Paging等组件的KMM Compose Multiplatform项目 Kotlin Multiplatform Mobile&#xff08;KMM&#xff09;已经从一个雄心勃勃的想法发展成为一个稳定而强大的框架&#xff0c;为开发人员提供了在多个平台上无缝共享代码的能力。通过最近的稳定…

2023.11.25电商项目平台建设2 -四大业务之核销主题建模

1.数仓建模步骤 自下而上 ADS-DWS-DWM-DWD 2.DWD方案(清洗转换,降维拉宽) DWD层的表 dwd_sale_store_sale_dtl_i 门店销售明细宽表 维度dim 销售sale合成成的宽表 dwd_dim_date_f 日期表 store_sale_dtl 门店销售明细表 dwd_sale_store_sale_dtl_i 门店销售明细表 …

漏洞分析 | 经典的Shiro反序列化

0x01、前言 相信大家总是面试会问到java反序列化&#xff0c;或者会问到标志性的漏洞&#xff0c;比如shiro反序列化&#xff0c;或者weblogic反序列化漏洞。 那我就这篇文章为大家讲解一下&#xff0c;不懂的哥哥直接背一下&#xff0c;理解一下就好了。 至于为什么要选择sh…

力扣(LeetCode)907. 子数组的最小值之和(C++)

枚举 请对题目有疑惑的小伙伴看枚举思想&#xff0c;有助于掌握最基本的解题思路。对于本题数据范围&#xff0c;枚举算法会超时。 请看题目描述&#xff1a;给定一个整数数组 arr&#xff0c;找到 min(b) 的总和&#xff0c;其中 b 的范围为 arr 的每个&#xff08;连续&…

锂电池污水如何处理

锂电池是目前应用广泛的重要电池类型&#xff0c;然而其生产过程和废弃处理中产生的污水对环境造成了不可忽视的影响。本文将探讨锂电池污水的处理方法&#xff0c;以期为环境保护和可持续发展作出贡献。 首先&#xff0c;了解锂电池污水的组成是解决问题的关键。锂电池污水通…

解释LED显示屏的裸眼3D特效原理

LED电子大屏幕的3D特效技术正在不断发展&#xff0c;而实现这一技术的原理主要包括分光、分色、分时和光栅等四种方法。这些原理都有各自的特点和应用场景&#xff0c;下面将对它们进行详细介绍。 1. 分光方法 分光方法是一种基于偏振光的3D显示技术。通过使用偏振滤镜或偏振片…

webshell之编码免杀

Unicode编码 jsp支持unicode编码&#xff0c;如果杀软不支持unicode查杀的话&#xff0c;基本上都能绕过 注意这里的\uuuu00可以换成\uuuu00uuu...可以跟多个u达到绕过的效果 将代码&#xff08;除page以及标签&#xff09;进行unicode编码&#xff0c;并条件到<%%>标签…

Drupal Core 8 PECL YAML 反序列化任意代码执行漏洞(CVE-2017-6920)

漏洞描述 影响软件&#xff1a;Drupal方式&#xff1a;反序列化参考链接&#xff1a;CVE-2017-6920:Drupal远程代码执行漏洞分析及POC构造效果&#xff1a;任意代码执行 漏洞环境及利用 搭建docker环境 环境启动后&#xff0c;访问 将会看到drupal的安装页面&#xff0c;一路…

仅2万粉,带了2.6万件的货!TikTok Shop美区达人周榜(11.13-11.19)

11月24日&#xff0c;TikTok Shop近日公布了美国市场和英国市场的全托管黑五大促战绩。数据显示&#xff0c;11月14日至11月20日&#xff0c;其美国市场的订单量环比10月20日-10月26日增长了205%。 家居户外热销品有&#xff1a;数码触摸屏相框、毛绒地毯、家居毛毯。黑马商品…

Callable、Future和FutrueTask详解

一、Callable介绍 1.1 Runnable介绍 Runnable是一个接口&#xff0c;里面声明了run方法。但是由于run方法返回值类型为void&#xff0c;所以在执行完成任务后&#xff0c;无法返回任何结果。 FunctionalInterface public interface Runnable {public abstract void run(); }…

手机爬虫用Fiddler详细教程

如果你正在进行手机爬虫的工作&#xff0c;那么一款强大而又实用的网络调试工具Fiddler将会是你的好帮手。今天&#xff0c;我将和大家分享一份详细的Fiddler教程&#xff0c;教你如何使用它来轻松捕获和分析手机App的网络请求。让我们一起来探索Fiddler的功能和操作&#xff0…

内衣洗衣机怎么选?内衣洗衣机便宜好用的牌子推荐

相信不少用户并不太在意衣服和内衣裤裤能不能同时洗&#xff0c;每次清洗都是把内衣裤与其他衣服一起放入洗衣机清洗&#xff0c;其实内衣裤不能直接跟大件的衣物一起放入洗衣机洗的&#xff0c;很容易会造成我们皮肤的瘙痒&#xff0c;我们大部分时间都在户外&#xff0c;暴露…

2023年11月27日历史上的今天大事件早读

1852年11月27日 计算机程序创始人阿达-洛芙莱斯去世 1893年11月27日 抗日爱国将领续范亭诞辰 1895年11月27日 《茶花女》作者、法国著名作家小仲马逝世 1899年11月27日 董其武将军诞辰 1902年11月27日 《新小说》创刊 1907年11月27日 割让刚果给比利时的条约签订 1925年1…

AMD ROCm软件栈组件介绍

AMD ROCm™ Platform 1.1 ROCm简介 参考&#xff1a;https://github.com/RadeonOpenCompute/ROCm ROCm&#xff08;Radeon Open Compute&#xff09;开源软件栈。 在NVIDIA GPU上&#xff0c;术语“CUDA”通常是指GPU编程编译器、API和运行时库&#xff0c;但ROCm不那么单一…

C语言基础篇5:指针(一)

指针是C语言的核心、精髓所在&#xff0c;用好了指针可以在C语言编程中起到事半功倍的效果。指针一方面可以提高程序的编译效率和执行速度&#xff0c;而且还可以通过指针实现动态的存储分配&#xff0c;另一方面使用指针可使程序更灵活&#xff0c;便于表示各种数据结构&#…

数据结构与算法编程题26

计算二叉树深度 #define _CRT_SECURE_NO_WARNINGS#include <iostream> using namespace std;typedef char ElemType; #define ERROR 0 #define OK 1 #define Maxsize 100 #define STR_SIZE 1024typedef struct BiTNode {ElemType data;BiTNode* lchild, * rchild; }BiTNo…

Python基础:字符串(String)详解(需补充完善)

1. 字符串定义 在Python中&#xff0c;字符串是一种数据类型&#xff0c;用于表示文本数据。字符串是由字符组成的序列&#xff0c;可以包含字母、数字、符号和空格等字符。在Python中&#xff0c;你可以使用单引号&#xff08;&#xff09;或双引号&#xff08;"&#xf…