RabbitMQ日常使用小结

news2025/1/11 12:49:09

一、使用场景

削峰、解耦、异步。
基于AMQP(高级消息队列协议)协议来统一数据交互,通过channel(网络信道)传递信息。erlang语言开发,并发量12000,支持持久化,稳定性好,集群不支持动态扩展。

RabbitMQ的基本概念

二、组成及工作流程

1.主要组成

Broker(消息代理): 消息队列服务进程,一个Broker以开多个虚拟主机(VirtualHost)。
VirtualHost(虚拟主机):虚拟主机,用于进行逻辑隔离,一个虚拟主机可以有若干个Exchange和Queue
Exchange(交换机):消息队列交换机,按一定的规则将消息路由转发到某个队列。
Queue:消息队列。

2.工作流程

RabbitMQ的工作流程

生产者发送消息流程:
1、和Broker建立TCP连接。
2、和Broker建立通道。
3、通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)。

消费者接收消息流程:
1、和Broker建立TCP连接
2、和Broker建立通道
3、监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、接收到消息。
6、ack回复。

三、交换机Exchange(默认direct)

交换机,接受消息,根据路由键发送消息到绑定的队列(不具备消息存储的能力)。

1.交换机种类

Direct: 单播直连交换机,Exchange将消息完全匹配路由键(routing key)的方式绑定消息,获取信息时也要匹配Exchange和路由键。
直连交换机
fanout: 广播式交换机(Publish/subscribe),不管消息的路由键(routing key),Exchange都会将消息转发给所有绑定的Queue。
广播/扇形交换机
topic: 主题交换机,工作方式类似于组播,Exchange会将消息转发和路由键(routing key)符合匹配模式的所有队列,如: routing_key为user.stock的Message会转发给绑定匹配模式为 *.stock 、user.stock* 、 #.user.stock.#的队列。(*表是匹配一个任意词组,#表示匹配0个或多个词组)。
主题交换机
headers: 头交换机,无Binding Key;当然也无Routing Key。根据发送的消息内容中的headers属性进行匹配。

2.交换机属性

Name:交换机名称
Durability:持久化标志,表明此交换机是否是持久化的
Auto-delete:删除标志,表明当所有队列在完成使用此exchange时,是否删除
Arguments:依赖代理本身。

3.交换机状态

持久(durable)
暂存(transient)

4.消息确认机制(ACK)

自动ACK:消息一旦被接收,消费者自动发送ACK。
手动ACK:消息接收后,不会发送ACK,需要手动调用。

四、rabbitmq 客户端的使用

1.引入依赖

       <!-- rabbitmq 客户端依赖 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>

2.创建连接工具

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class MyRabbitMQUtils {
    public static Connection getConnel() throws Exception{
        //1 创建 ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory() ;
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);
        //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);
        Connection connection = factory.newConnection();
     //   Channel channel = connection.createChannel();
        return connection;
    }
}

3.生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

    // 交换机名称
    private final static String EXCHANGE_NAME = "simple_exchange";
    // 队列名称
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[]args) throws Exception{
        Connection produceConnection = MyRabbitMQUtils.getConnel();
        Channel produceChannel = produceConnection.createChannel();
        // 建立交换机(广播)
        produceChannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true);
        /*
         * 1、queue 队列名称
         * 2、durable 是否持久化
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,如:可设置存活时间
         */
        produceChannel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /*
         * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
         * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
         * 3、props,消息的属性
         * 4、body,消息内容
         */
        for(int i=0;i<10;i++){
            String message = "生产者发布的消息---!";
            message = message+i;
            produceChannel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes());
            System.out.println(" Producer 发布'" + message + "'");
        }
        //关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
        produceChannel.close();
        produceConnection.close();
    }
}

4.消费者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;

public class Comsumer {
    
    // 队列名称
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {

        Connection comsumerConnection = MyRabbitMQUtils.getConnel();
        Channel comsumerChannel = comsumerConnection.createChannel();
        /*
         * 参数明细
         * 1、queue 队列名称
         * 2、durable 是否持久化
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,如:可设置存活时间
         */
        comsumerChannel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //实现消费方法
        DefaultConsumer consumer = new DefaultConsumer(comsumerChannel){
            /*
             * 当接收到消息后此方法将被调用
             * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
             * @param envelope 信封,通过envelope
             * @param properties 消息属性
             * @param body 消息内容
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                // body 即消息体
                String msg = new String(body,"utf-8");
                System.out.println("Comsumer 获得: " + msg + "!");
                // 手动 ACK
                comsumerChannel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // 监听队列,第二个参数:是否自动进行消息确认。
        //参数:String queue, boolean autoAck, Consumer callback
        /**
         * 参数明细:
         * 1、queue 队列名称
         * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复。
         * 3、callback,消费方法,当消费者接收到消息要执行的方法。
         */
        comsumerChannel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

五、Spring中使用RabbitMQ

1.引入依赖

        <!-- AMQP 依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.2.7.RELEASE</version>
        </dependency>
        <!--springboot测试依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.5.6</version>
        </dependency>

2.更改配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

3.把交换机、和队列加入IOC容器中

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // email队列
    public static final String QUEUE_EMAIL = "queue_email";

    // sms队列
    public static final String QUEUE_SMS = "queue_sms";

    // topics类型交换机
    public static final String EXCHANGE_NAME="topic.exchange";

    public static final String ROUTINGKEY_EMAIL="topic.#.email.#";

    public static final String ROUTINGKEY_SMS="topic.#.sms.#";

    //声明交换机
    @Bean(EXCHANGE_NAME)
    public Exchange exchange(){
        //durable(true) 持久化,mq重启之后交换机还在
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    //声明email队列
    /**
     *   new Queue(QUEUE_EMAIL,true,false,false)
     *   durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
     *   auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
     *   exclusive  表示该消息队列是否只在当前connection生效,默认是false
     */
    @Bean(QUEUE_EMAIL)
    public Queue emailQueue(){
        return new Queue(QUEUE_EMAIL);
    }

    //声明sms队列
    @Bean(QUEUE_SMS)
    public Queue smsQueue(){
        return new Queue(QUEUE_SMS);
    }

    //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
    @Bean
    public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }

    //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
    @Bean
    public Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }

}

4.模拟业务发送消息

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class Send {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void sendMsgByTopics(){

        /**
         * 参数:
         * 1、交换机名称
         * 2、routingKey
         * 3、消息内容
         */
        for (int i=0;i<5;i++){
            String message = "恭喜您,注册成功!userid="+i;
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"topic.sms.email",message);
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

5.消息的监听及处理

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive {

    //监听邮件队列
    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_email", durable = "true"),
            exchange = @Exchange(value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
            key = {"topic.#.email.#","email.*"}))
    public void rece_email(String msg){
        System.out.println(" [邮件服务] received : " + msg + "!");
    }

    //监听短信队列
    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_sms", durable = "true"),
            exchange = @Exchange(value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
            key = {"topic.#.sms.#"}))
    public void rece_sms(String msg){
        System.out.println(" [短信服务] received : " + msg + "!");
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/548618.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

可见性原子性有序性的+线程传参的方式+Java如何实现多个线程之间共享数据+线程间通信+死锁产生

//为了均衡CPU和内存的速度差异,增加了缓存 导致了可见性的问题; //操作系统增加了进程 线程 分时复用CPU,均衡CPU和io设备的速速差异 导致了原子性问题; //jvm指令重排序(优化指令排序) 导致了有序性的问题 可见性问题是指 线程A修改共享变量,修改后CPU缓存中的数据没有及时同…

Emacs之目前最快补全插件lsp-bridge(八十八)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

数据分析12——Pandas中数据合并方法

0、前言&#xff1a; 在pandas中进行数据合并的操作和数据库中的join操作非常类似。 1、merge横向合并&#xff1a; 前言&#xff1a;该函数只能做横向合并函数名&#xff1a;merge()函数参数&#xff1a; left: 数据类型为’DataFrame | Series’&#xff0c;需要进行合并的…

[CTF/网络安全] 攻防世界 PHP2 解题详析

[CTF/网络安全] 攻防世界 PHP2 解题详析 index.php.phps扩展名姿势 翻译&#xff1a;你能给这个网站进行身份验证吗&#xff1f; index.php index.php是一个常见的文件名&#xff0c;通常用于Web服务器中的网站根目录下。它是默认的主页文件名&#xff0c;在访问一个网站时&am…

说说计算这事儿:从开关到人工智能

目录 一 前言 二 计算历史 三 计算探秘 四 算力优化 五 未来展望 一 前言 计算本身其实是一个比较抽象的词&#xff0c;或者说比较笼统。很多场景都可能用到计算这个词&#xff0c;因此具体的含义就需要根据上下文来确定。今天我们讨论的计算&#xff0c;是比较狭义的计算…

【环境准备】在虚拟机的Ubuntu下安装VS Code并配置C/C++运行环境

1.点击进入 vscode官网 下载.deb安装包 2.启动虚拟机下的Ubuntu&#xff0c;Windows下的Xftp和Xshell Xftp&#xff1a;用于将刚刚在Windows下下载好的vscode.deb安装包传输到Ununtu中。Xshell&#xff1a;用于远程登录Ununtu&#xff0c;进行 vscode.deb 安装包安装&#xff…

算法26:递归练习

目录 题目1&#xff1a;给你一个字符串&#xff0c;要求打印打印出这个字符串的全部子序列&#xff08;子序列不能重复&#xff09; 题目2&#xff1a;打印一个字符串的全部排列。 题目3&#xff1a;针对题目2&#xff0c;要求去除重复元素 题目4&#xff1a;给定一个字符串…

ARM的读写内存指令与栈的应用

1.基础读写指令 写内存指令&#xff1a;STR MOV R1, #0xFF000000 MOV R2, #0x40000000 STR R1, [R2] 将R1寄存器中的数据写入到R2指向的内存空间 需注意&#xff0c;此命令是将R1中的数据写给R2所指向的内存空间&#xff0c;而不是直接把R1的数据赋给R2&#xff0c;R2寄存器…

chatgpt赋能Python-python3_9如何安装

Python 3.9 安装教程 Python 是一款非常流行的编程语言&#xff0c;而 Python 3.9 是其中的最新版本。不过&#xff0c;有些人可能会遇到一些问题&#xff0c;因为这是一个新版本。在本篇文章中&#xff0c;我们将介绍 Python 3.9 的安装过程&#xff0c;并提供一些关键的步骤…

无线通信网 - 动态主机配置协议 DHCP

文章目录 1 概述2 DHCP2.1 工作原理2.2 报文类型 3 扩展3.1 网工软考真题 1 概述 #mermaid-svg-VTnvU3Vd01Y4gppz {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-VTnvU3Vd01Y4gppz .error-icon{fill:#552222;}#merm…

[CTF/网络安全] 攻防世界 Training-WWW-Robots 解题详析

[网络安全] 攻防世界 Training-WWW-Robots 解题详析 在这个小训练挑战中&#xff0c;你将学习 Robots_exclusion_standard&#xff08;机器人排除标准&#xff09;。 robots.txt 文件是由网络爬虫用来检查是否允许他们爬行和索引你的网站或仅部分内容。有时这些文件揭示目录结构…

Vivado HLS 第1讲 软件工程师该怎么了解FPGA架构

Vivado HLS是将基于C/C++描述的算法转化成相应的RTL代码,最终在FPGA上实现。这就要求软件工程师对FPGA的内部架构有一些基本的认识,目的在于保证生成的RTL代码在性能和资源上能够达到很好的平衡。实际上,C语言与FPGA是有一些对应关系的。比如: C语言中的数组可对应于FPGA中…

直方图与直方图均衡化

直方图 图像直方图是用来表现图像中亮度分布的直方图&#xff0c;给出的是图像中某个亮度或者某个范围亮度下共有几个像素&#xff0c;即统计一幅图某个亮度像素数量。 直方图作为一种简单有效的基于统计特性的特征描述子&#xff0c;在计算机视觉领域广泛使用。 它的优点主要…

上下文无关文法、句柄、正规文法、规范推导、文法二义性

目录 上下文无关文法 句柄 正规文法 规范推导 文法二义性 上下文无关文法 上下文无关文法&#xff08;Context-Free Grammar&#xff0c;CFG&#xff09;是一种形式语言&#xff0c;用于描述一类语言的语法结构。它由一组产生式规则组成&#xff0c;每个规则定义了如何将一…

hackthebox htb interface:CVE-2022-28368

本题考察:CVE-2022-28368 CVE-2022-28368 - 通过远程 CSS 字体缓存安装的 RCE 参考: https://www.0le.cn/archives/58.htmlhackthebox-interface信息搜集nmap扫描端口发现开放的22和80PORT STATE SERVICE REASON22/tcp open ssh syn-ac...https://www.0le.cn/archives/58.htm…

Spring 经典面试题总结

❤ 作者主页&#xff1a;欢迎来到我的技术博客&#x1f60e; ❀ 个人介绍&#xff1a;大家好&#xff0c;本人热衷于Java后端开发&#xff0c;欢迎来交流学习哦&#xff01;(&#xffe3;▽&#xffe3;)~* &#x1f34a; 如果文章对您有帮助&#xff0c;记得关注、点赞、收藏、…

learn_C_deep_12 (深度理解“取整“、“取余“、“取模“运算、掌握运算符优先级 )

目录 关于“取整” "取整"规则 1、向零取整 2、向-∞取整 3、向∞取整 4、四舍五入 关于"取模和取余" 运算符优先级 关于“取整” #include <stdio.h> int main() {//本质是向0取整int i -2.9;int j 2.9;printf("%d\n", i); /…

【C++】 排列与组合算法详解(进阶篇)

文章目录 写在前面算法1&#xff1a;朴素算法思路缺点 算法2&#xff1a;递推预处理思路时间复杂度&#xff1a; O ( n 2 ) O(n^2) O(n2) 算法3&#xff1a;阶乘逆元思路时间复杂度&#xff1a; O ( n log ⁡ n ) O(n \log n) O(nlogn)思考&#xff1a;读者也可以尝试写 O ( n…

PySide6/PyQT多线程之 多线程 与 线程池的模板(拿来即用)

前言 关于PySide6/PyQT多线程系列的最后一篇。写这篇文章的动机是方便后续代码的直接复用。 本篇文章实际是水文&#xff0c;给出了 PySide6/PyQT的多线程以及线程池的基础使用模板&#xff0c;方便后面有需要时候直接拿来就用。 多线程 这里分两种情况来谈论&#xff0c;有返…

[Hadoop]MapReduce与YARN

目录 大数据导论与Linux基础 Apache Hadoop、HDFS MapReduce MapReduce思想 MapReduce设计构思 MapReduce介绍 MapReduce官方实例 Map阶段执行流程 Reduce阶段执行流程 shuffle机制 YARN YARN介绍 YARN架构、组件 程序提交YARN交互流程 YARN资源调度器Scheduler…