RabbitMQ 第一天 基础 4 RabbitMQ 的工作模式 4.3 Routing 路由模式

news2024/11/19 17:45:19

RabbitMQ

【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】

文章目录

      • RabbitMQ
      • 第一天 基础
      • 4 RabbitMQ 的工作模式
        • 4.3 Routing 路由模式
          • 4.3.1 模式说明
          • 4.3.2 代码编写
          • 4.3.3 小结

第一天 基础

4 RabbitMQ 的工作模式

4.3 Routing 路由模式

4.3.1 模式说明

先来看个问题

在这里插入图片描述

上次我们使用发布订阅 Pub / Sub 工作模式 完成了发送一条消息,可以被两个消费者 分别处理的例子

这里面有个不合理的地方,假设 日志信息只是等级为 info 的不是那么重要 的信息, 这种信息就没有必要存入数据库

现在的需求:同一条消息 由不同的消费者接收并去 做不同的事【比如说 info级别 的日志就打印控制台,error 级别的日志 再存入数据库】

【这就要用到 我们接下来要讲的 Routing 工作模式了】

【模式说明】

看看官网

在这里插入图片描述

队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey

Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息

在这里插入图片描述

图解:

  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
4.3.2 代码编写

【生产者】

package com.dingjiaxiong.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * ClassName: Producer_Routing
 * date: 2022/11/16 10:58
 * 发送消息
 *
 * @author DingJiaxiong
 */

public class Producer_Routing {

    public static void main(String[] args) throws IOException, TimeoutException {

        //1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2. 设置参数
        factory.setHost("xxxxxxxxxxxxxxxx"); // 服务器IP【默认本机 localhost】
        factory.setPort(5672); //端口【默认也是 5672】

        factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】
        factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】
        factory.setPassword("12345"); //密码【默认 guest】

        //3. 创建连接 Connection
        Connection connection = factory.newConnection();

        //4. 创建Channel
        Channel channel = connection.createChannel();

        //5. 创建交换机
        /**
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
         * 参数解释:
         *      1. exchange:交换机名称
         *      2. type:交换机类型
         *          【枚举】
         *           DIRECT("direct"):定向
         *           FANOUT("fanout"):扇形【广播】【发送消息到每一个与之绑定队列】
         *           TOPIC("topic"):通配符的方式
         *           HEADERS("headers"):参数匹配
         *      3. durable:是否持久化
         *      4. autoDelete:自动删除
         *      5. internal:内部使用【一般false】
         *      6. arguments:参数
         * */

        String exchangeName = "test_direct";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);

        //6. 创建队列【两个】
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";

        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);


        //7. 绑定队列和交换机
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * 参数解释:
         *      1. queue:参数
         *      2. exchange:交换机名称
         *      3. routingKey:路由key【绑定规则】[交换机 类型若为fanout, 默认为空]
         * */
        channel.queueBind(queue1Name, exchangeName, "error");
        channel.queueBind(queue2Name, exchangeName, "info");
        channel.queueBind(queue2Name, exchangeName, "error");
        channel.queueBind(queue2Name, exchangeName, "warning");

        //8. 发送消息

        String body = "这是一条日志信息...级别为info";

        channel.basicPublish(exchangeName,"info",null,body.getBytes());

        //9. 释放资源
        channel.close();
        connection.close();

    }

}

直接运行

在这里插入图片描述

OK, 消息发送完成,看看管控台

在这里插入图片描述

交换机创建了,看看绑定关系

在这里插入图片描述

没问题, 看看队列情况

在这里插入图片描述

可以看到 只有队列2 中,即设置了路由key 为 info 的队列 才有消息

【消费者】

package com.dingjiaxiong.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * ClassName: Consumer_Routing1
 * date: 2022/11/16 12:37
 *
 * @author DingJiaxiong
 */


public class Consumer_Routing1 {

    public static void main(String[] args) throws IOException, TimeoutException {

        //1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2. 设置参数
        factory.setHost("43.138.50.253"); // 服务器IP【默认本机 localhost】
        factory.setPort(5672); //端口【默认也是 5672】

        factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】
        factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】
        factory.setPassword("12345"); //密码【默认 guest】

        //3. 创建连接 Connection
        Connection connection = factory.newConnection();

        //4. 创建Channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";


        // 接收消息【消费消息】
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 参数解释:
         *      1. queue:队列名称
         *      2. autoAck:是否自动确认
         *      3. callback:回调对象
         * */
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法,当收到消息后,会自动执行这个 方法
            /*
            * 1. consumerTag:消息的标识
            * 2. envelope:获取一些 信息【交换机、路由key...】
            * 3. properties: 配置信息
            * 4. body: 数据
            * */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:" + consumerTag);
//                System.out.println("Exchange:" + envelope.getExchange());
//                System.out.println("RoutingKey:" + envelope.getRoutingKey());
//                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
                System.out.println("将日志 打印到了 控制台...");
            }
        };
        channel.basicConsume(queue2Name,true,consumer);

        // 不要关闭资源,让它一直监听

    }

}

OK,q2 让它做打印到控制台的工作,再来一个消费者 2,专门做error日志的存储数据库的操作

在这里插入图片描述

OK, 直接运行

在这里插入图片描述

OK, 效果也很明显, 存储数据库操作 并没有运行,因为路由 key 对不上,

现在再来发送一条消息error 的

在这里插入图片描述

直接运行

在这里插入图片描述

发送完成,查看控制台

在这里插入图片描述

没毛病,这样就达到了我们 最上面的 需求

【这就是 Routing 工作模式】

4.3.3 小结

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/112424.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

React 学习笔记总结(四)

文章目录1. 创建组件流程(以及脚手架环境流程)2. 样式 的模块化3. 常用快捷生成4. 通用性 组件编码流程5. React脚手架 配置代理5.1 React 引入 ajax库5.2 第一种配置代理方式(package.json)5.3 第二种代理方式(setupProxy.js)6. React List列表效果实现7. React 消息订阅与发布…

“内卷之王”vivo:成败即将见分晓

文丨熔财经 作者|XL 12月22日&#xff0c;随着年底收官之作S16系列正式发布&#xff0c;vivo完成了自己的年度答卷。2022年&#xff0c;vivo总体风头正盛&#xff0c;尤其在第三季度一马当先稳居国内出货量榜首&#xff0c;市占比提升到20.0%&#xff0c;领先第二名3%。这是一…

elementUI中el-table每行异常高度原因排查,累死

理论上不单独设置高度的话&#xff0c;表格每一个应该是默认的高度才对&#xff0c;我说的没错吧&#xff0c; 但是请看实际情况&#xff1a; 这是默认情况下的高度为48 还有两外一个表格&#xff0c;我也没有设置高度&#xff0c;但是但是&#xff1a;这个高度竟然达到了71&…

深入理解蓝牙BLE之“无线通信的调制解调”

FSK&#xff1a; Frequency Shift Keying&#xff0c;频移键控&#xff0c;即一种允许根据数字调制信号改变载波频率而进行数据传输的数字调制技术&#xff0c;比如在BFSK中&#xff0c;二进制1和二进制0期间传输不同频率的载波信号。由于这种调制解调方式容易实现&#xff0c…

我国水产养殖行业现状及趋势分析:不断推进产业机械化高质量发展

水产养殖业主要是人类利用适宜水域养殖水产经济动植物的生产事业&#xff0c;是渔业的重要组成部分。按养殖水域&#xff0c;水产养殖可分为淡水养殖、海水养殖、浅海滩涂养殖&#xff1b;按养殖对象&#xff0c;可分为鱼类养殖、贝类养殖、虾类养殖、蟹类养殖、藻类栽培&#…

AndroidR兼容性适配指南

AndroidR Android 11 基于 Android 早期版本构建&#xff0c;增加了多种功能和更新&#xff0c;以保障用户安全并提高透明度和可控性。所有开发者都应查看隐私功能并测试他们的应用。具体影响可能会因每个应用的核心功能、目标平台和其他因素而异。 Android 11介绍 Android 1…

day 10 模拟和高精度

P1328 [NOIP2014 提高组] 生活大爆炸版石头剪刀布 #include<bits/stdc.h> using namespace std; int n, na, nb, fa, fb;//f:得分 int a[205], b[205];void fun(int ta, int tb){if(ta 0 && tb 1) fb;if(ta 1 && tb 0) fa;if(ta 0 && tb …

学习记录-mybatis+vue+elementUi实现分页条件查询

因为前端并不难&#xff0c;这里前后端就一起实现。一共分为四个步骤 步骤一 ① SQL部分 分页条件查询其实就是在分页的基础上增加条件&#xff0c;这里有两个主要的函数需要去实现&#xff0c;一个是根据分页以及条件去查询数据&#xff0c;另一个是根据条件去统计数据。 明显…

结构体嵌套函数指针

这次来记录一下结构体嵌套函数指针 这个知识点想了2天终于搞懂了。 先看代码&#xff0c;试着理解一下&#xff0c;不理解再看我后面的解释。 解释&#xff1a; 首先&#xff0c;和平常创建一个结构体一样&#xff0c;唯独不同的就是里面的变量是一个函数指针&#xff0c;关…

Spring Bean作用域

目录 什么是作用域呢 ? 那什么又是Spring Bean的作用域呢 ? Spring框架默认Bean作用域是什么呢 ? Spring Bean的作用域都有哪些呢 ? 如何设置Bean作用域 什么是作用域呢 ? 在JavaSE中,作用域就是指一个变量可生效的范围. 就比如一个变量的作用域是方法的代码块的范围…

fpga实操训练(signal tap调试)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 编写软件的同学都知道&#xff0c;如果需要调试软件的话&#xff0c;除了要学会打印信息日志之外&#xff0c;另外一个很重要的方面&#xff0c;就…

http 库的服务端实现

前言 net/http 库的客户端实现(上) net/http 库的客户端实现(下) net/http 库的服务端实现 上两篇文章介绍了 http 客户端的实现&#xff0c;这篇文章看一下服务端的实现 服务端 使用 net/http 库可以快速搭建HTTP服务&#xff0c;HTTP服务端主要包含两部分&#xff1a; …

5G无线技术基础自学系列 | 5G网络切换问题分析

素材来源&#xff1a;《5G无线网络规划与优化》 一边学习一边整理内容&#xff0c;并与大家分享&#xff0c;侵权即删&#xff0c;谢谢支持&#xff01; 附上汇总贴&#xff1a;5G无线技术基础自学系列 | 汇总_COCOgsta的博客-CSDN博客 无线通信的最大特点在于其具有移动性&a…

提高搜狗PR最好的方法与搜狗PR权重在线查询

搜狗PR是什么? 搜狗PR值全称为搜狗PageRank(网页级别)&#xff0c;是搜狗用于用来标识网页的等级、重要性的一种方法&#xff0c;是搜狗用来衡量一个网站的好坏的重要标准之一。 搜狗在揉合了诸如Title标识和Keywords标识等所有其它因素之后&#xff0c;通过PageRank来…

react学习

1. React概述 1.1 什么是react&#xff1f; React 是一个用于构建用户界面的 JavaScript 库 用户界面:HTML页面(前端) React 主要用来写HTML页面&#xff0c;或构建Web应用如果从 MVC 的角度来看&#xff0c;React 仅仅是视图层(V)&#xff0c;也就是只负责视图的染&#xff0…

Day44——Dp专题

文章目录子序列问题27.最长递增子序列28、最长连续递增序列29、最长重复子数组30、最长公共子序列31、不相交的线32、最大子序和33、判断子序列34、不同的子序列35、两个字符串的删除操作36、编辑距离37、回文子串38、最长回文子序列动态规划总结篇背包问题系列股票系列子序列系…

java论坛贴子网站ssm论坛项目发帖子网站论坛系统论坛源码

ssm开发的论坛系统&#xff0c;用户注册后可以发布帖子&#xff0c;其他人可以评论回复点赞评论和点赞回复&#xff0c;用户可以在个人中心管理自己的帖子&#xff0c;以及查看自己对他人的回复&#xff0c;和他人对自己的回复。 演示视频&#xff1a; https://www.bilibili.c…

图(Graph)的定义

图(Graph)的定义 文章目录图(Graph)的定义●图的形式化定义:G (V,E)●无向图和有向图的表示形式:● 有向图和无向图的定义●抽象数据类型定义ADT●图形结构属于复杂的非线性结构● 图由顶点的集合和边的集合构成 ●图的形式化定义:G (V,E) • 集合V(vertex):顶点的有限集合,…

多线程基础入门

文章目录前言一、认识线程&#xff08;一&#xff09;概念1.线程是什么2.为啥要有线程&#xff08;轻量级进程&#xff09;为什么线程比进程更轻量经典面试题&#xff1a;谈谈进程和线程的区别和联系3.线程的结构&#xff08;二&#xff09;第一个多线程程序&#xff08;三&…

Java中的自旋锁,手动实现一个自旋锁

自旋锁 CAS是实现自旋锁的基础&#xff0c;CAS利用CPU指令保证了操作的原子性&#xff0c;已达到锁的效果。自旋是指尝试获取锁的线程不会立即阻塞&#xff0c;而是采用循环的方式去尝试获取锁&#xff0c; 当线程发现锁被占用时&#xff0c;会不断循环判断锁的状态&#xff0…