springboot与rabbitmq的整合【演示5种基本交换机】

news2024/11/16 12:05:26

前言
👏作者简介:我是笑霸final,一名热爱技术的在校学生。
📝个人主页:个人主页1 || 笑霸final的主页2
📕系列专栏:后端专栏
📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
🔥如果感觉博主的文章还不错的话,👍点赞👍 + 👀关注👀 + 🤏收藏🤏

话不多说 直接开干

目录

  • 一 导入maven坐标与配置
  • 二、直连交换机direct exchange
    • 2.1配置类QueueConfig
    • 2.2消息提供者
    • 2.2消息消费者
    • 2.3测试类
  • 三、默认交换机default exchange
    • 3.1配置类和消息提供者
    • 3.2消息消费者
    • 3.3测试结果
  • 四、扇型交换机fanout exchange
    • 4.1配置类
    • 4.2消息提供者
    • 4.3消息消费者
    • 4.4测试类
  • 五、主题交换机topic exchanges
    • 5.1配置类
    • 5.2消息提供者
    • 5.3消息消费者
    • 5.4测试
  • 六、头交换机 headers exchange
    • 6.1配置类
    • 6.2创建消息提供者
    • 6.3消息消费者
    • 6、4测试结果

一 导入maven坐标与配置

<!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

基础配置文件

spring:
  rabbitmq:
    username: 你的用户名
    password: 你的密码
    host: rabbitmq安装的主机的 ip地址
    port: 5672 #端口号

二、直连交换机direct exchange

直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。

  • 将一个队列 绑定到 某个交换机上,同时赋予该绑定一个路由键(routing key)
  • 当一个携带着路由键为routingKey01的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为routingKey01的队列。

直连交换机经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

在这里插入图片描述

2.1配置类QueueConfig

@Configuration
public class QueueConfig {

    /**
     * 创建一个队列  队列名为direct1
     * */
    @Bean
    public Queue queue01(){
    
        return new Queue("direct1",true);//true表示持久化
    }

    /**
     * 创建一个直连交换机 名为directExchange
     * */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }

    /**
     * 在让队列和直连交换机绑定在一起
     * */
    @Bean
    public Binding binding(){
        Binding binding= BindingBuilder
                .bind(queue01())
                .to(directExchange()).with("routingKey01");
        return binding;
    }

}

2.2消息提供者

@Component
public class MqProducer {
    
    @Resource
    private RabbitTemplate rabbitTemplate;
    
    public void sent_test(Object o){
        //convertAndSend(交换机的名字,交换机中路由键名称,参数)
        rabbitTemplate.convertAndSend(
                "directExchange",//交换机名字
                "routingKey01",//路由key
                o);

    }
}

2.2消息消费者


@Component
@Slf4j
public class MqConsumer {
    
    /**
     * 接收消息
     */
    @RabbitListener(queues = {"direct1"})
    public void receivedD(Message message, Channel channel)throws Exception{
        String msg=new String(message.getBody());
        log.info("当前时间:{},消费者1收到消息:{}",new Date().toString(),msg);
    }

}

我写了两个消费者内容一致

2.3测试类

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {
    @Resource
    private MqProducer mqProducer;//注入消息提供者
    @Test
    public void test_send() throws InterruptedException {
        // 循环发送消息
        while (true) {
            mqProducer.sent_test("你好,我是Lottery 001");
            Thread.sleep(3500);
        }
    }
}

测试结果
在这里插入图片描述

三、默认交换机default exchange

默认交换机(default exchange)实际上一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称队列名称 相同

3.1配置类和消息提供者

/**
*配置类
*/
@Configuration
public class QueueConfig {
//只需要创建一个队列
//每个`新建队列`(queue)都会`自动`绑定到`默认交换机`上,
//绑定的`路由键(routing //key)名称`与`队列名称` 相同
    @Bean
    public Queue queue02(){
        return new Queue("def");
    }

}
/**
*消息提供者
*/
@Component
public class MqProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void def_sent_test(Object obj){
        //convertAndSend(交换机的名字,交换机中路由键名称,参数)
        rabbitTemplate.convertAndSend(
                //没有名字(名字为空字符串)
                "",
                "def",
                obj);//消息内容
    }
}

默认交换机名字是空字符串 。每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称队列名称 相同

3.2消息消费者

@Component
@Slf4j
public class MqConsumer {

    /**
     * 接收消息
     */
    @RabbitListener(queues = {"def"})
    public void receivedD02(Message message, Channel channel)throws Exception{
        String msg=new String(message.getBody());
        log.info("当前时间:{},消费者收到消息:{}",new Date().toString(),msg);
    }

}

3.3测试结果

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {
    @Resource
    private MqProducer mqProducer;//注入消息提供者
    
  	@Test
    public void test_send02() throws InterruptedException {
        // 循环发送消息
        while (true) {
            mqProducer.def_sent_test("测试默认交换机");
            Thread.sleep(3500);
        }
    }
}

在这里插入图片描述

四、扇型交换机fanout exchange

扇型交换机(fanout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)
这个交换机上的路由键将失效
在这里插入图片描述

4.1配置类

@Configuration
public class QueueConfig {

    /**
     * 创建多个队列
     * @return
     */
    @Bean
    public Queue queue03_1(){
        return new Queue("fanout03_1");
    }
    @Bean
    public Queue queue03_2(){
        return new Queue("fanout03_2");
    }
    @Bean
    public Queue queue03_3(){
        return new Queue("fanout03_3");
    }

    /**
     * 创建一个扇形交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    /**
     * 队列和扇形交换机绑定
     */
    @Bean
    public Binding binding_3_1(){
        Binding binding= BindingBuilder
                .bind(queue03_1())
                .to(fanoutExchange());
        return binding;
    }
    @Bean
    public Binding binding_3_2(){
        Binding binding= BindingBuilder
                .bind(queue03_2())
                .to(fanoutExchange());
        return binding;
    }
    @Bean
    public Binding binding_3_3(){
        Binding binding= BindingBuilder
                .bind(queue03_3())
                .to(fanoutExchange());
        return binding;
    }
}

4.2消息提供者

 @Component
public class MqProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 扇形交换机
     */
    public void fanout_sent_test(Object o){
        //convertAndSend(交换机的名字,交换机中路由键名称,参数)
        rabbitTemplate.convertAndSend(
                "fanoutExchange",
                "",//扇形交换机也没有路由建
                o);

    }

}

注意:扇形交换机也没有路由key 也用空字符串

4.3消息消费者

@Component
@Slf4j
public class MqConsumer {

    @RabbitListener(queues = {"fanout03_1"})
    public void receivedD03_1(Message message, Channel channel)throws Exception{
        String msg=new String(message.getBody());
        log.info("绑定队列一 当前时间:{},消费者收到消息:{}",new Date().toString(),msg);
    }
    @RabbitListener(queues = {"fanout03_2"})
    public void receivedD03_2(Message message, Channel channel)throws Exception{
        String msg=new String(message.getBody());
        log.info("绑定队列二 当前时间:{},消费者收到消息:{}",new Date().toString(),msg);
    }
    @RabbitListener(queues = {"fanout03_3"})
    public void receivedD03_3(Message message, Channel channel)throws Exception{
        String msg=new String(message.getBody());
        log.info("绑定队列三 当前时间:{},消费者收到消息:{}",new Date().toString(),msg);
    }
}

4.4测试类

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {
    @Resource
    private MqProducer mqProducer;//注入消息提供者
   
    @Test
    public void test_send03() throws InterruptedException {
    	int a=1;
        // 循环发送消息
        while (true) {
            mqProducer.fanout_sent_test("测试扇形交换机 第"+ a++ +"次循环");
            Thread.sleep(3500);
        }
    }
}

在这里插入图片描述

五、主题交换机topic exchanges

主题交换机(topic exchanges)通过对消息的路由键队列到交换机的绑定模式之间的匹配,将消息路由 一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。

在这里插入图片描述

5.1配置类

@Configuration
public class QueueConfig {

    /**
     * 创建;两个队列
     */
    @Bean
    public Queue topicQueue_1(){
        return new Queue("topicQueue_1");
    }
    @Bean
    public Queue topicQueue_2(){
        return new Queue("topicQueue_2");
    }
    /**
     * 创建主题交换机
     */
    @Bean
    public TopicExchange TopicExchange(){
        return new TopicExchange("TopicExchange");
    }
    /**
     * 根据不同的key绑定不同的队列
     */
    @Bean
    public Binding bindingTopicExchange_1(){
        Binding binding= BindingBuilder
                .bind(topicQueue_1())
                .to(TopicExchange()).with("key1");
        return binding;
    }
    @Bean
    public Binding bindingTopicExchange_2(){
        Binding binding= BindingBuilder
                .bind(topicQueue_2())
                .to(TopicExchange()).with("key2");
        return binding;
    }
}

5.2消息提供者

@Component
public class MqProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     * 主题交换机
     */
    public void topic_sent_test(Object o,String key){
        rabbitTemplate.convertAndSend(
                "TopicExchange",
                key, //后面动态的传递key
                o);
    }
}

5.3消息消费者

@Component
@Slf4j
public class MqConsumer1 {
    /**
     * 接收消息
     */
    @RabbitListener(queues = {"topicQueue_1"})
    public void topicQueue_1(Message message, Channel channel)throws Exception{
        String msg=new String(message.getBody());
        log.info("队列一 当前时间:{},消费者收到消息:{}",new Date().toString(),msg);
    }

    @RabbitListener(queues = {"topicQueue_2"})
    public void topicQueue_2(Message message, Channel channel)throws Exception{
        String msg=new String(message.getBody());
        log.info("队列二 当前时间:{},消费者收到消息:{}",new Date().toString(),msg);
    }

}

5.4测试

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {
    @Resource
    private MqProducer mqProducer;//注入消息提供者
   @Test
    public void test_send04() throws InterruptedException {
        // 循环发送消息
        int a=1;
        while (true) {

            if(a%2 == 0){
                mqProducer.topic_sent_test("!!给队列二的消息==第"
                + a++ +"次循环","key2");
            }else{
                mqProducer.topic_sent_test("!!给队列一的消息==第"
                + a++ +"次循环","key1");
            }

            Thread.sleep(3500);
        }
    }
}

在这里插入图片描述

使用案例:

  • 分发有关于特定地理位置的数据,例如销售点
  • 由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
  • 股票价格更新(以及其他类型的金融数据更新)
  • 涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
  • 云端的不同种类服务的协调
  • 分布式架构/基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。

六、头交换机 headers exchange

有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性代替 路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
在这里插入图片描述

6.1配置类

@Configuration
public class QueueConfig {
	 /**
     * 创建2个队列
     */
    @Bean(name = "headersQ1")
    public Queue queue1() {
        return new Queue("headersQ1");
    }
    @Bean(name = "headersQ2")
    public Queue queue2() {
        return new Queue("headersQ2");
    }

    /**
     * 创建交换机
     * @return
     */
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headersExchange");
    }
    /**
     * 绑定交换机和队列
     */
    @Bean
    public Binding binding1() {
        HashMap<String, Object> header = new HashMap<>();
        header.put("queue", "queue1");
        header.put("bindType", "whereAll");
        return BindingBuilder
                .bind(queue1())
                .to(headersExchange())
                .whereAll(header).match();
    }
    @Bean
    public Binding binding2() {
        HashMap<String, Object> header = new HashMap<>();
        header.put("queue", "queue2");
        header.put("bindType", "whereAny");
        return BindingBuilder
                .bind(queue2())
                .to(headersExchange())
                .whereAny(header).match();
    }
}

6.2创建消息提供者

@Component
public class MqProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 头交换机
     * @param msg
     */
    public void headers_send(String msg,int a) {
        //a用来控制头信息 达到传递给不同的队列效果

        MessageProperties messageProperties = new MessageProperties();
        if(  a % 3 ==0){
            messageProperties.setHeader("queue", "queue2");
            messageProperties.setHeader("bindType", "whereAny");
        }else{
            messageProperties.setHeader("queue", "queue1");
            messageProperties.setHeader("bindType", "whereAll");
        }


        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("headersExchange", null, message);

    }
}

6.3消息消费者

@Component
@Slf4j
public class MqConsumer1 {
    /**
     * 接收消息
     */
    @RabbitListener(queues = "headersQ1")
    public void receive1(String msg) {
        log.info("接收到 headersQ1 发送的消息:" + msg);
    }

    @RabbitListener(queues = "headersQ2")
    public void receive2(String msg) {
        log.info("接收到 headersQ2 发送的消息:" + msg);
    }
  }

6、4测试结果

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {
    @Resource
    private MqProducer mqProducer;//注入消息提供者
    
    @Test
    public void test_headers_send() throws InterruptedException {
        // 循环发送消息
        int a=1;
        while (true) {
            mqProducer.headers_send("消息"+a,a++);
            Thread.sleep(3500);
        }
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/757440.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于梯度下降的线性回归(Gradient Descent For Linear Regression)

概述&#xff1a; 梯度下降是很常用的算法&#xff0c;它不仅被用在线性回归上和线性回归模型、平方误差代价函数。在本次&#xff0c;我们要将梯度下降和代价函数结合。我们将用到此算法&#xff0c;并将其应用于具体的拟合直线的线性回归算法里。 梯度下降算法和线性回归算法…

Cell 子刊 | 深度睡眠脑电波调节胰岛素敏感性促进血糖调节

缺乏高质量的睡眠会增加一个人患糖尿病的风险。然而&#xff0c;为什么会这样仍然是一个不解之谜。 近期&#xff0c;加州大学伯克利分校的一组睡眠科学家的新发现为我们揭示了答案。研究人员在人体内发现了一种潜在的调控机制&#xff0c;解释了为什么夜间深度睡眠脑电波能够调…

数据结构(王道)——线性表之静态链表顺序表和链表的比较

一、静态链表 定义&#xff1a; 代码实现&#xff1a; 如何定义一个静态链表 静态链表的基本操作思路&#xff1a; 初始化静态链表&#xff1a; 静态链表的查找、插入、删除 静态链表总结&#xff1a; 二、顺序表和链表的比较 逻辑结构对比&#xff1a; 存储结构对比&#xff…

golang关于成员变量使用:=

错误 错误原因 结构体成员变量不能与:一起用&#xff0c;这是一个语法错误。

Mybatis架构简介

文章目录 1.整体架构图2. 基础支撑层2.1 类型转换模块2.2 日志模块2.3 反射工具模块2.4 Binding 模块2.5 数据源模块2.6缓存模块2.7 解析器模块2.8 事务管理模块3. 核心处理层3.1 配置解析3.2 SQL 解析与 scripting 模块3.3 SQL 执行3.4 插件4. 接口层1.整体架构图 MyBatis 分…

SpringMVC【SpringMVC参数获取、SpringMVC处理响应】(二)-全面详解(学习总结---从入门到深化)

目录 SpringMVC参数获取_使用Servlet原生对象获取参数 SpringMVC参数获取_自定义参数类型转换器 SpringMVC参数获取_编码过滤器 SpringMVC处理响应_配置视图解析器 SpringMVC处理响应_控制器方法的返回值 SpringMVC处理响应_request域设置数据 SpringMVC处理响应_sessi…

【动手学深度学习】--02.Softmax回归

文章目录 Softmax回归1.原理1.1 从回归到多类分类1.2三种常见的损失函数 2.图像分类集2.1读取数据集2.2读取小批量2.3整合组件 3.从零实现Softmax回归3.1初始化模型参数3.2定义softmax操作3.3定义模型3.4定义损失函数3.5分类精度3.6训练3.7预测 4.softmax回归的简洁实现4.1初始…

计网笔记--应用层

1--网络程序的组织方式和关系 网络应用程序在各种端系统上的组织方式及其关系主要有两种&#xff1a; 客户/服务器方式&#xff08;C/S方式&#xff09;和对等方式&#xff08;P2P方式&#xff09;&#xff1b; 2--动态主机配置协议&#xff08;DHCP&#xff09; 动态主机配置协…

26 sigmoid Belief Network

文章目录 26 Sigmoid Belief Network26.1 背景介绍26.2 通过log-likelihood推断SBN的后验26.3 醒眠算法——Wake Sleep Algorithm 26 Sigmoid Belief Network 26.1 背景介绍 什么是Sigmoid Belief Network&#xff1f;Belief Network等同于Bayesian Network&#xff0c;表示有…

新手如何自学PostgreSQL(PG)

如果你是一个新手&#xff0c;想要自学PostgreSQL&#xff0c;下面是一些步骤和资源&#xff0c;可以帮助你入门&#xff1a; ①了解数据库基础知识&#xff1a;在开始学习PostgreSQL之前&#xff0c;建议你先了解一些数据库的基础概念和术语&#xff0c;例如表、列、行、SQL查…

【Elasticsearch】搜索结果处理和RestClient查询文档

目录 2.搜索结果处理 2.1.排序 2.1.1.普通字段排序 2.1.2.地理坐标排序 2.2.分页 2.2.1.基本的分页 2.2.2.深度分页问题 2.2.3.小结 2.3.高亮 2.3.1.高亮原理 2.3.2.实现高亮 2.4.总结 3.RestClient查询文档 3.1.快速入门 3.1.1.发起查询请求 3.1.2.解析响应 …

LangChain(6)构建用户自己的Agent

构建用户自己的Agent 编写简单的计算工具编写有多个参数的工具其它更高级的工具 LangChain 中有一些可用的Agent内置工具&#xff0c;但在实际应用中我们可能需要编写自己的Agent。 编写简单的计算工具 !pip install -qU langchain openai transformersfrom langchain.tools …

Spring-Interceptor拦截器

使用步骤 申明拦截器bean&#xff0c;并实现HandlerInterceptor接口 true为放行&#xff0c;false为拦截 2.定义配置类&#xff0c;继承WebMvcConfigurationSupport&#xff0c;实现addInterceptors方法&#xff0c;该方法调用具体的拦截器进行拦截 也可以在配子类通过实现W…

HTPP入门教程||HTTP 状态码||HTTP content-type

HTTP 状态码 当浏览者访问一个网页时&#xff0c;浏览者的浏览器会向网页所在服务器发出请求。当浏览器接收并显示网页前&#xff0c;此网页所在的服务器会返回一个包含 HTTP 状态码的信息头&#xff08;server header&#xff09;用以响应浏览器的请求。 HTTP 状态码的英文为…

Springboot+Flask+Neo4j+Vue2+Vuex+Uniapp+Mybatis+Echarts+Swagger综合项目学习笔记

文章目录 Neo4j教程&#xff1a;Neo4j高性能图数据库从入门到实战 医疗问答系统算法教程&#xff1a;医学知识图谱问答系统项目示例&#xff1a;neo4j知识图谱 Vueflask 中药中医方剂大数据可视化系统可视化技术&#xff1a;ECharts、D.jsflask教程&#xff1a;速成教程Flask w…

『分割』 分割圆柱

原始点云 直通滤波过滤后&#xff08;z:0~1.5&#xff09; 分割到的平面 分割得到的圆柱形 代码&#xff1a; #include <pcl/ModelCoefficients.h> #include <pcl/io/pcd_io.h> #include <pcl/filters/extract_indices.h> // 用于提取指定索引的数据 #inclu…

伪标签(pseudo label)(半监督学习)

使用伪标签进行半监督学习&#xff0c;在机器学习竞赛当中是一个比较容易快速上分的关键点。下面给大家来介绍一下什么是基于伪标签的半监督学习。在传统的监督学习当中&#xff0c;我们的训练集具有标签&#xff0c;同时&#xff0c;测试集也具有标签。这样我们通过训练集训练…

RS485转ETHERCAT连接西门子支持ethercat吗

我们将为大家介绍一款强大的设备——远创智控YC-ECT-RS485/232通讯网关。这是一款自主研发的ETHERCAT从站功能的网关&#xff0c;它能够将ETHERCAT网络和RS485或RS232设备无缝连接。 这款网关在ETHERCAT总线和RS485或RS232总线中均能发挥主站或从站的作用。它的最大特点就是解决…

ICCV 2023 接收结果出炉!再创历史新高!录用2160篇!(附6篇最新论文)

点击下方卡片&#xff0c;关注“CVer”公众号 AI/CV重磅干货&#xff0c;第一时间送达 点击进入—>【计算机视觉】微信交流群 2023 年 7 月 14 日13:03&#xff0c;ICCV 2023 顶会论文接收结果出炉&#xff01;这次直接放出论文 Accepted Paper ID List。这也意味着&#xf…