RabbitMQ的安装与使用

news2025/1/12 10:02:00

RabbitMQ的安装与使用

  • 介绍
  • 一、RabbitMQ的安装
    • 1 查找镜像
    • 2 拉取镜像
    • 3 查看镜像
    • 4 创建容器
    • 5 查看容器
    • 6 访问测试
  • 二、RabbitMQ的使用
    • 1 创建项目
    • 2 配置文件
    • 3 队列配置文件
    • 4 消费者
    • 5 生产者
    • 6 测试
  • 三、交换器
  • 四、普通队列Demo
  • 五、死信队列Demo
    • 1 介绍
    • 2 示例
      • 2.1 配置
      • 2.2 生产者
      • 2.3 消费者
      • 2.4 死信消费者
      • 2.5 结果
  • 六、延时队列Demo
    • 1 安装延迟插件
      • 1.1 下载插件
      • 1.2 将插件拷贝到RabbitMQ容器的插件目录
      • 1.3 进入到容器
      • 1.4 开启插件
      • 1.5 查看
    • 2 示例
      • 2.1 配置
      • 2.2 生产者
      • 2.3 消费产者
      • 2.4 结果

介绍

RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。开发语言为Erlang。
linux系统中安装RabbitMQ比较繁琐,这里使用的是Docker安装。

一、RabbitMQ的安装

1 查找镜像

docker search rabbitmq:management

在这里插入图片描述

2 拉取镜像

docker pull macintoshplus/rabbitmq-management

在这里插入图片描述

3 查看镜像

docker images

在这里插入图片描述

4 创建容器

docker run -d --hostname mzw-rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 c20

命令解读:

  • 运行一个镜像
  • -d 后台守护运行
  • –hostname mzw-rabbitmq 指定主机名称
  • –name 指定容器名称
  • -e RABBITMQ_DEFAULT_USER=admin 指定用户名
  • -e RABBITMQ_DEFAULT_PASS=admin 指定密码
  • -p 15672:15672 -p 5672:5672 端口映射
  • c20 镜像ID 简写
    在这里插入图片描述

5 查看容器

docker ps

在这里插入图片描述

6 访问测试

访问地址:http://192.168.2.xx:15672/
在这里插入图片描述
输入启动容器时设置的用户密码登录
在这里插入图片描述
这就表示RabbitMQ安装成功了

二、RabbitMQ的使用

1 创建项目

创建SpringBoot项目并引入相关依赖
在这里插入图片描述

2 配置文件

# RabbitMQ 配置
spring.rabbitmq.name=rabbitmq-demo01
spring.rabbitmq.host=192.168.2.22
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

# 自定义一个属性,设置队列的名称
mq.queue.name=hello-queue

3 队列配置文件

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

// 添加@Configuration 注解,表示一个注解类
@Configuration
public class QueueConfig {

    @Value("${mq.queue.name}")
    private String queueName;

    /**
     * 初始化短信队列
     * @return
     */
    @Bean
    public Queue delayedSmsQueueInit() {
        return new Queue(queueName);
    }
}

4 消费者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 创建一个rabbitmq消费者
 */
@Component
public class Receiver {

    // 接受MQ消息 并 处理消息
    @RabbitListener(queues = {"${mq.queue.name}"})
    public void process(String msg){
        // 处理消息
        System.out.println("我是MQ消费者,我接收到的消息是:" + msg );
    }
}

5 生产者

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 消息提供者
 */
@Component
public class Sender {

    @Autowired
    private AmqpTemplate template;

    @Value("${mq.queue.name}")
    private String queueName;

    // 发送消息
    public void send(String msg){
        // 队列名,消息内容
        template.convertAndSend(queueName,msg);
    }

}

6 测试

  • 发送消息
    @Autowired
    private Sender sender;
    @Test
    void contextLoads() {
        sender.send("你好啊......");
    }
    
  • 接收消息
    在这里插入图片描述

三、交换器

RabbitMQ中有五种主要的交互器分别如下

交换器说明
direct发布与订阅 完全匹配
fanout广播
topic主体,规则匹配
fanout转发
custom自定义

四、普通队列Demo

上边已经演示,这里不重复演示。

五、死信队列Demo

1 介绍

死信队列就是在某种情况下,导致消息无法被正常消费(异常,过期,队列已满等),存放这些未被消费的消息的队列即为死信队列

2 示例

2.1 配置

  • 配置文件
# RabbitMQ 配置
spring.rabbitmq.name=rabbitmq-demo01
spring.rabbitmq.host=192.168.2.22
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin


###死信队列
mq.dlx.exchange=mq_dlx_exchange
mq.dlx.queue=mq_dlx_queue
mq.dlx.routingKey=mq_dlx_key
###备胎交换机
mq.exchange=mq_exchange
mq.queue=mq_queue
mq.routingKey=routing_key
  • 配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {
    /**
     * 普通交换机
     */
    @Value("${mq.exchange}")
    private String mqExchange;

    /**
     * 普通队列
     */
    @Value("${mq.queue}")
    private String mqQueue;

    /**
     * 普通路由key
     */
    @Value("${mq.routingKey}")
    private String mqRoutingKey;
    /**
     * 死信交换机
     */
    @Value("${mq.dlx.exchange}")
    private String dlxExchange;

    /**
     * 死信队列
     */
    @Value("${mq.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${mq.dlx.routingKey}")
    private String dlxRoutingKey;

    /**
     * 声明死信交换机
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }

    /**
     * 声明死信队列
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }

    /**
     * 声明普通业务交换机
     * @return DirectExchange
     */
    @Bean
    public DirectExchange mqExchange() {
        return new DirectExchange(mqExchange);
    }

    /**
     * 声明普通队列
     * @return Queue
     */
    @Bean
    public Queue mqQueue() {
        // 普通队列绑定我们的死信交换机
        Map<String, Object> arguments = new HashMap<>(2);
        //死信交换机
        arguments.put("x-dead-letter-exchange", dlxExchange);
        //死信队列
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(mqQueue, true, false, false, arguments);
    }

    /**
     * 绑定死信队列到死信交换机
     * @return Binding
     */
    @Bean
    public Binding binding(Queue dlxQueue,DirectExchange dlxExchange) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);
    }


    /**
     * 绑定普通队列到普通交换机
     * @return Binding
     */
    @Bean
    public Binding mqBinding(Queue mqQueue,DirectExchange mqExchange) {
        return BindingBuilder.bind(mqQueue).to(mqExchange).with(mqRoutingKey);
    }
}

2.2 生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;

/**
 * 生产者
 */
@RestController
@Slf4j
public class MQProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 普通交换机
     */
    @Value("${mq.exchange}")
    private String mqExchange;
    /**
     * 普通路由key
     */
    @Value("${mq.routingKey}")
    private String mqRoutingKey;

    @RequestMapping("/sendMsg")
    public String sendMsg() {
        String msg = "Hello RabbitMQ ......";
        //发送消息  参数一:交换机 参数二:路由键(用来指定发送到哪个队列)
        rabbitTemplate.convertAndSend(mqExchange, mqRoutingKey, msg, message -> {
            // 设置消息过期时间 10秒过期    如果过期时间内还没有被消费 就会发送给死信队列
            message.getMessageProperties().setExpiration("10000");
            return message;
        });
        log.info("生产者发送消息:{}", msg);
        return "success";
    }
}

2.3 消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 */
@Component
@Slf4j
public class MQConsumer {

    /**
     * 监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = {"${mq.queue}"})
    public void mqConsumer(String msg) {
        log.info("正常普通消费者消息MSG:{}", msg);
    }
}

2.4 死信消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 死信消费者
 */
@Component
@Slf4j
public class MQDlxConsumer {
    /**
     * 死信队列监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = {"${mq.dlx.queue}"})
    public void mqConsumer(String msg) {
        log.info("死信队列消费普通消息:msg{}", msg);
    }

}

2.5 结果

访问:http://127.0.0.1:9023/sendMsg 会被 消费者 消费掉
消费者 代码注释掉,在访问http://127.0.0.1:9023/sendMsg,等待10秒钟后会被死信队列接收到。
在这里插入图片描述

六、延时队列Demo

  • 两种方式:
    • 第一种:使用死信队列,将消息放入一个没有被监听的队列上,设置TTL(一条消息的最大存活时间)为延迟的时间,时间到了没有被消费,直接成为死信。监听死信队列来进行操作。
    • 第二种:使用rabbitmq官方提供的delayed插件来真正实现延迟队列。本文对第二种进行详解

1 安装延迟插件

官网下载:https://www.rabbitmq.com/community-plugins.html
我的RabbitMQ是3.12 b版本的,下载此插件
在这里插入图片描述

1.1 下载插件

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez

在这里插入图片描述

1.2 将插件拷贝到RabbitMQ容器的插件目录

docker cp ./rabbitmq_delayed_message_exchange-3.12.0.ez de24369edeb4:/plugins

在这里插入图片描述

1.3 进入到容器

docker exec -it de24369edeb4 /bin/bash

1.4 开启插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

1.5 查看

rabbitmq-plugins list

E* 或 e* 代表 插件已启用
在这里插入图片描述
在RabbitMQ控制台可以看到
在这里插入图片描述

2 示例

2.1 配置

  • 配置文件
# RabbitMQ 配置
spring.rabbitmq.name=rabbitmq-demo01
spring.rabbitmq.host=192.168.2.22
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

# 自定义一个属性,设置队列的名称
mq.queue.name=hello-queue
  • 配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 使用x-delayed-message 延时队列插件
 */
@Configuration
public class QueueConfig {

    @Value("${mq.queue.name}")
    private String queueName;

    /**
     * 初始化短信队列
     * @return
     */
    @Bean
    public Queue delayedSmsQueueInit() {
        return new Queue(queueName);
    }

    /**
     * 初始化延迟交换机
     * @return
     */
    @Bean
    public CustomExchange delayedExchangeInit() {
        Map<String, Object> args = new HashMap<>();
        // 设置类型,可以为fanout、direct、topic
        args.put("x-delayed-type", "direct");
        // 第一个参数是延迟交换机名字,第二个是交换机类型,第三个设置持久化,第四个设置自动删除,第五个放参数
        return new CustomExchange("delayed_exchange","x-delayed-message", true,false,args);
    }

    /**
     * 短信队列绑定到交换机
     * @param delayedSmsQueueInit
     * @param customExchange
     * @return
     */
    @Bean
    public Binding delayedBindingSmsQueue(Queue delayedSmsQueueInit, CustomExchange customExchange) {
        // 延迟队列绑定延迟交换机并设置RoutingKey为sms
        return BindingBuilder.bind(delayedSmsQueueInit).to(customExchange).with("sms").noargs();
    }
}

2.2 生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


/**
 * 生产者
 */
@RestController
@Slf4j
public class Sender {

    @Autowired
    private AmqpTemplate template;

    @Value("${mq.queue.name}")
    private String queueName;

    // 发送消息
    @RequestMapping("/sendMsg")
    public void send(){
        String msg = "Hello RabbitMQ ......";
        // 队列名,消息内容
        template.convertAndSend(queueName,msg);
        log.info("生产者发送消息:{}", msg);
    }

    @RequestMapping("/sendDelayedMsg")
    public void sendDelayedMsg(){
        String msg = "Hello RabbitMQ Delayed ......";
        // 第一个参数是延迟交换机名称,第二个是Routingkey,第三个是消息主题,第四个是X,并设置延迟时间,单位		是毫秒
        template.convertAndSend("delayed_exchange","sms",msg,a -> {
            a.getMessageProperties().setDelay(2000);
            return a;
        });
        log.info("生产者发送延时消息:{}", msg);
    }
}

2.3 消费产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


/**
 * 消费者
 */
@Component
@Slf4j
public class Receiver {

    // 接受MQ消息 并 处理消息
    @RabbitListener(queues = {"${mq.queue.name}"})
    public void process(String msg){
        // 处理消息
        log.info("我是MQ消费者,我接收到的消息是:{}", msg);
    }
}

2.4 结果

访问:http://127.0.0.1:9022/sendMsg
访问:http://127.0.0.1:9022/sendDelayedMsg
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1456706.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux下多核CPU指定程序运行的核

设置程序在指定CPU核心运行 一、如何查看程序运行的CPU信息 1.1 查看当前系统CPU有几个核心 查看CPU核心数量&#xff1a;lscpu 1.2 查看程序的PID ps aux|grep cpu_test1.3 查看程序可运行的CPU taskset -c -p pid1.4 设置程序在指定核心上运行 1.4.1 通过运行时的参数设…

Halcon 图像增强(相关算法)

Halcon 图像增强(相关算法) 代码 *****1.读取图片打开窗口**************

【OpenCV学习笔记29】- OpenCV 中的直方图 - 直方图 - 3:2D 直方图

这是对于 OpenCV 官方文档中 图像处理 的学习笔记。学习笔记中会记录官方给出的例子&#xff0c;也会给出自己根据官方的例子完成的更改代码&#xff0c;同样彩蛋的实现也会结合多个知识点一起实现一些小功能&#xff0c;来帮助我们对学会的知识点进行结合应用。 如果有喜欢我笔…

成都力寰璨泓科技有限公司抖音小店购物新体验

在数字化时代&#xff0c;网购已成为人们生活中不可或缺的一部分。随着抖音等短视频平台的兴起&#xff0c;越来越多的消费者选择在抖音小店购物。成都力寰璨泓科技有限公司抖音小店&#xff0c;作为新兴的电商力量&#xff0c;凭借其可靠的品质和服务&#xff0c;正逐渐成为消…

小程序常用组件

一、tabBar tabBar的相关设置要设置在app.json中&#xff08;全局配置&#xff09;。 注意&#xff1a;tabBar中的list是数组形式&#xff0c;每一项都是以对象形式存在&#xff1b; list中对象的数量最多5个&#xff0c;最少2个&#xff1b; list中的对象的pagePath和text是必…

Docker部署Redis哨兵模式

目录结构 先按照这个目录结构创建。 redis主从配置 redis-master主配置文件 #允许远程连接 bind 0.0.0.0# 设置Redis实例的端口号 port 6379# 设置Redis实例的密码 requirepass 123456# 启用持久化 appendonly yes redis-slave1从配置文件 #允许远程连接 bind 0.0.0.0# 设…

ALINX黑金AXU3EGB 开发板用户手册RS485通信接口图示DI RO信号方向标识错误说明

MAX3485这类RS485芯片&#xff0c;DI是TTL信号输入&#xff0c;RO是TTL信号输出 如下图是MAX3485手册规格书。 因此 ALINX黑金AXU3EGB 用户手册 Page 43页 图 3-11-1 PL 端 485 通信的连接示意图&#xff0c;MAX3485芯片的DI RO信号输入输出标识方向是错误的&#xff0c;应为蓝…

【Linux】---Linux下基本指令(2)

目录 一、指令详细介绍1.1 cat 指令1.2 echo 指令1.3 more 指令1.4 less 指令1.5 head 指令1.6 tail 指令1.7 date 指令1.8 cal 指令1.9 find 指令1.10 grep 指令1.11 zip/unzip 指令1.12 tar 指令1.13 uname –r 指令&#xff1a; 一、指令详细介绍 1.1 cat 指令 语法&#…

ABINet原理讲解以及运行

论文地址&#xff1a;https://arxiv.org/pdf/2103.06495.pdf 代码地址&#xff1a;https://github.com/FangShancheng/ABINet 前言 OCR技术经历了是从传统方法到深度学习方法的一个过程&#xff0c;所以在这里我也简述一下传统的OCR技术方法。传统OCR方法在简单场景下效果良…

算法沉淀——BFS 解决最短路问题(leetcode真题剖析)

算法沉淀——BFS 解决最短路问题&#xff08;leetcode真题剖析&#xff09; 01.迷宫中离入口最近的出口02.最小基因变化03.单词接龙04.为高尔夫比赛砍树 BFS&#xff08;广度优先搜索&#xff09;是解决最短路径问题的一种常见算法。在这种情况下&#xff0c;我们通常使用BFS来…

智胜未来,新时代IT技术人风口攻略-第五版(弃稿)

文章目录 前言鸿蒙生态科普调研人员画像高校助力鸿蒙高校鸿蒙课程开设占比教研力量并非唯一原因 企业布局规划全盘接纳仍需一段时间企业对鸿蒙的一些诉求 机构入场红利机构鸿蒙课程开设占比机构对鸿蒙的一些诉求 鸿蒙实际体验高校用户群体场景分析企业用户群体场景分析培训机构…

东方博宜 1395. 小丽找数?

东方博宜 1395. 小丽找数&#xff1f; #include<iostream> using namespace std; int main() {int x ;cin >> x ;int cnt 0 ;for (int i 1 ; i < x ; i){ int y i ;int sum 0;while(y > 0){sum y%10 ;y / 10 ;}if(sum%5!0 &&sum%2!0)cnt 1 …

多线程---乐观锁、悲观锁

乐观锁&#xff08;Optimistic Locking&#xff09; 乐观锁则是一种假定数据在并发访问时很少会发生冲突的锁定策略。因此&#xff0c;乐观锁在访问数据时不会立即对数据进行加锁&#xff0c;而是在更新数据时检查数据是否被其他线程修改过。如果数据没有被修改过&#xff0c;则…

shell脚本文本三剑客grep,sed,awk

1. 正则表达式&#xff0c;又称正规表达式、常规表达式 使用字符串来描述、匹配一系列符合某个规则的字符串 正则表达式组成&#xff1a; 普通字符包括大小写字母、数字、标点符号及一些其他符号。 元字符是指在正则表达式中具有特殊意义的专用字符 man 7 regex 可以使用man手…

达梦数据库——数据迁移sqlserver-dm报错问题_未完待续

记录SQL server到达梦数据迁移过程中遇到的问题&#xff0c;持续更新中... 报错情况一&#xff1a;Sql server迁移达梦连接报错’驱动程序无法通过使用安全套接字Q层(SSL)加密与SQL Server 建立安全连接。错误:“The server selected protocol version TLS10 is not accepted b…

2024开工大吉,便宜寄快递该怎么选呢?

随着春节的结束&#xff0c;大部分人回到了工作的岗位&#xff0c;相信许多人还沉浸在过年的喜悦的氛围中呢&#xff0c;但是我们可以期盼下一个春节的到来了&#xff0c;言归正传&#xff0c;工作中总会收发快递了&#xff0c;尤其是最近&#xff0c;需要联络客户的感情了&…

vtkBoarderWidget及图片坐标包含计算

开发环境&#xff1a; Windows 11 家庭中文版Microsoft Visual Studio Community 2019VTK-9.3.0.rc0vtk-example demo解决问题&#xff1a;移动图片到坐标轴的中心&#xff0c;创建一个vtkBoarderWidget控件&#xff0c;移动控件&#xff0c;计算控件与图片的包含关系 关键点…

【linux网络的综合应用】补充网关服务器搭建,综合应用SNAT、DNAT转换,dhcp分配、dns分离解析,nfs网络共享以及ssh免密登录

实验拓朴图&#xff1a; 1&#xff09;网关服务器&#xff1a;ens36&#xff1a;12.0.0.254/24&#xff0c;ens33&#xff1a;192.168.100.254/24&#xff1b;Server1&#xff1a;192.168.100.101/24&#xff1b;PC1和server2&#xff1a;自动获取IP&#xff1b;交换机无需配置…

SSL数据加密一定能保证数据的完整性吗?

SSL数据加密是一种常见的网络安全措施&#xff0c;用于保护数据在传输过程中的安全。它通过使用加密算法将数据转换为密文&#xff0c;然后在传输过程中对数据进行保护&#xff0c;以防止数据被窃取或篡改。然而&#xff0c;尽管SSL数据加密可以提供一定程度的数据保密性&#…

光流方向以及 remap 重映射的理解

Date: 2023-09-07 省流&#xff1a;光流法计算prev 到next 的flow&#xff0c;之后flow &#xff08;加上当前位置坐标&#xff09;生成flow_map&#xff0c;利用flow_map 和OpenCV remap 函数&#xff0c;可以将next remap 得到 prev&#xff0c;即remap 后一帧得到前一帧图像…