一文搞清RabbitMQ的部署运维及使用

news2024/11/18 4:43:54

1.通过docker-compose安装RabbitMQ

1.0 初始化yum和Docker

yum update
yum install epel-release -y
yum clean all
yum list
yum install docker-io -y

1.1 dockerfile

FROM rabbitmq:management
MAINTAINER LCJ
# 添加插件到指定目录 可按照此方式自行扩展其他插件
# ADD ./rabbitmq_delayed_message_exchange-3.11.1.ez /plugins
# 开启管理界面插件
RUN rabbitmq-plugins enable rabbitmq_management
# 开启延迟队列插件
#RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange ENTRYPOINT ["rabbitmq-server"]

1.2 docker-compose.yml(单节点)

version: '3.5' 
services:
  rabbitmq:
    image: rabbitmq:3.11-alpine #镜像版本
    hostname: rabbit_1
    container_name: rabbitmq
    restart: always
    build:
      context: .
    ports:
      - "15672:15672"
      - "5672:5672"
    volumes:
      - ./data:/var/lib/rabbitmq
      - ./log:/var/log/rabbitmq
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=123456
    network_mode: "bridge"

1.3服务器允许挂载的文件写

chmod +x  /root/rabbitmq/data
chmod +x /root/rabbitmq/log

1.3 rabbitMQ起起来进入manage界面

在这里插入图片描述

2 RabbitMQ基础知识简介

2.1概念

  • Broker:服务端程序,一个mq节点就是一个broker
  • Producer生产者:创建一个Message,然后发布到RabbitMQ中
  • Consumer消费者:消费队列里的消息
  • Message消息:生产消费的内容,有消息体,消息头,也包括多个属性,比如rountingKey路由键
  • Queue队列:是RabbitMQ的内部对象,用于存储消息,消息智能存储在队列中
  • Channel信道:支持多路复用的通道,独立双向数据流通,可以发布,订阅,接收消息
  • Connection链接Lsocket链接,封装了socket协议逻辑,一个链接上可以有多个channel
  • Exchange交换器:生产者将消息发送到exchange,交换机将消息路由到队列中,队列和交换机是多对多的关系。DIrect Exchange(定向) Fanout Exchange(广播) Tocpic Exchange通配符
  • RoutingKey 路由键:生产者将消息发送给交换机时,会指定RoutingKey,用来指定消息的路由规则
  • Binding绑定:通过绑定交换机与队列关联起来,在绑定时,会指定一个BindingKey,可以正确的将消息路由到队列。
  • Virtual host虚拟主机:独立的exchange和queue

2.2 RabbitMQ工作模式

  • 简单模式(单生产,单消费)
  • 工作队列(单生产,多消费,可以有轮询和公平策略) channel.basicQos(1)
  • fanout交换机,fanout交换机直接转发到队列,不需要指定routingkey
  • 路由模式:direct交换机,交换机和队列绑定,指定路由键,生产者发送消息到交换机,交换机根据信息的路由key进行转发到队列,消息要指定routingkey路由键
  • 通配符模式:交换机与队列绑定,指定通配符路由键,生产者发送消息到交换机,交换机根据消息的路由key转发到队列,消息要指定routingkey路由键

3.SpringBoot2.X+SpringAMQP整合RabbitMQ实战

3.1 AMQP依赖

 <!--引入AMQP-->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>

server:
  port: 8080

#消息队列
spring:
  rabbitmq:
    host: xx.xx.xx
    virtual-host: /dev
    password: 123456
    username: admin
    #开启消息二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated


    #开启消息二次确认,交换机到队列的可靠性投递
    publisher-returns: true
    #为true,则交换机处理消息到路由失败,则会返回给生产者
    template:
      mandatory: true

    #消息手工确认ACK
    listener:
      simple:
        acknowledge-mode: manual

3.2 主题模式开发

RabbitM@Config文件

@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_NAME = "order_exchange";
    public static final String QUEUE_NAME = "order_queue";
    /**
     * 交换机
     * @return
     */
    @Bean
    public Exchange orderExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
        //return new TopicExchange(EXCHANGE_NAME, true, false);
    }/**
     * 队列
     * @return
     */
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
        //return new Queue(QUEUE_NAME, true, false, false, null);
    }/**
     * 交换机和队列绑定关系
     */
    @Bean
    public Binding orderBinding(Queue queue, Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
        //return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "order.#", null);
    }}

生产者;

@SpringBootTest
class DemoApplicationTests {@Autowired
  private RabbitTemplate template;@Test
  void send() {
    template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单来啦1");
  }
}

消费者

@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {/**
     * RabbitHandler 会自动匹配 消息类型(消息自动确认)
     * @param msg
     * @param message
     * @throws IOException
     */
    @RabbitHandler
    public void releaseCouponRecord(String msg, Message message) throws IOException {long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("message="+message.toString());
        System.out.println("监听到消息:消息内容:"+message.getBody());}}

3.3 消息可靠性投递

3.3.1 什么是消息可靠性投递

  • 保证消息送到消息队列中
  • 保证mq节点接收消息
  • 消息发送端到mq服务端接收到消息的确认应答
  • 完善的消息补偿机制,发送失败二次处理

3.3.2 RabbitMQ的消息可靠性投递

生产者-> 交换机 -> 队列 -> 消费者
通过两个节点控制消息可靠性投递

  • 生产者->交换机:confirmCallBack
  • 交换机-> 队列 -> returnCallBack

在这里插入图片描述

@Autowired
  private RabbitTemplate template;@Test
  void testConfirmCallback() {
​
    template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
      /**
       *
       * @param correlationData 配置
       * @param ack 交换机是否收到消息,true是成功,false是失败
       * @param cause 失败的原因
       */
      @Override
      public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("confirm=====>");
        System.out.println("confirm==== ack="+ack);
        System.out.println("confirm==== cause="+cause);//根据ACK状态做对应的消息更新操作 TODO
      }
    });
    template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+,"order.new","新订单来啦1");
  }
 @Test
  void testReturnCallback() {
    //为true,则交换机处理消息到路由失败,则会返回给生产者
    //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
    template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
      @Override
      public void returnedMessage(ReturnedMessage returned) {
        int code = returned.getReplyCode();
        System.out.println("code="+code);
        System.out.println("returned="+returned.toString());
      }
    });
    template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"xxx.order.new","新订单来啦11");
  }

4 Rabbitmq的消息确认机制ACK实战+DeliveryTag介绍

spring:
  rabbitmq:
    #开启手动确认消息,如果消息重新入队,进行重试
    listener:
      simple:
        acknowledge-mode: manual
@RabbitHandler
public void releaseCouponRecord(String body, Message message, Channel channel) throws IOException {
        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("message="+message.toString());
        System.out.println("body="+body);//成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除
        //channel.basicAck(msgTag,false);
        //channel.basicNack(msgTag,false,true);}

5 RabbitMQ延迟队列主题模式综合实战

需求:商家新建账户检查是否在规定时间内商家商品,否则对该账户做出处理
RabbitMQConfig

package net.xdclass.xdclasssp.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 小滴课堂,愿景:让技术不再难学
 *
 *  新商家审核通过->new_merchant_queue -> 死信消息交换机 -> 死信队列
 *
 * @Description
 * @Author 二当家小D
 * @Remark 有问题直接联系我,源码-笔记-技术交流群
 * @Version 1.0
 **/

    @Configuration
    public class RabbitMQConfig {


    /**
     * 死信队列
     */
    public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";

    /**
     * 死信交换机
     */
    public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";

    /**
     * 进入死信队列的路由key
     */
    public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";

    /**
     * 创建死信队列
     * @return
     */
    @Bean
    public Queue lockMerchantDeadQueue(){
        return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
    }

    /**
     * 绑定死信交换机和死信队列
     * @return
     */
    @Bean
    public Binding lockMerchantBinding(){

        return new Binding(LOCK_MERCHANT_DEAD_QUEUE,Binding.DestinationType.QUEUE,
                LOCK_MERCHANT_DEAD_EXCHANGE,LOCK_MERCHANT_ROUTING_KEY,null);
    }


    /**
     * 普通队列,绑定的个死信交换机
     */
    public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";

    /**
     * 普通的topic交换机
     */
    public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";

    /**
     * 路由key
     */
    public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";



    /**
     * 创建普通交换机
     * @return
     */
    @Bean
    public Exchange newMerchantExchange(){
        return new TopicExchange(NEW_MERCHANT_EXCHANGE,true,false);
    }

    /**
     * 创建普通队列
     * @return
     */
    @Bean
    public Queue newMerchantQueue(){

        Map<String,Object> args = new HashMap<>(3);
        //消息过期后,进入到死信交换机
        args.put("x-dead-letter-exchange",LOCK_MERCHANT_DEAD_EXCHANGE);

        //消息过期后,进入到死信交换机的路由key
        args.put("x-dead-letter-routing-key",LOCK_MERCHANT_ROUTING_KEY);

        //过期时间,单位毫秒
        args.put("x-message-ttl",10000);


        return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
    }

    /**
     * 绑定交换机和队列
     * @return
     */
    @Bean
    public Binding newMerchantBinding(){

        return new Binding(NEW_MERCHANT_QUEUE,Binding.DestinationType.QUEUE,
                NEW_MERCHANT_EXCHANGE,NEW_MERCHANT_ROUTIING_KEY,null);
    }

生产者

 @GetMapping("check")
    public Object check(){

        //修改数据库的商家账号状态  TODO

        rabbitTemplate.convertAndSend(RabbitMQConfig.NEW_MERCHANT_EXCHANGE,RabbitMQConfig.NEW_MERCHANT_ROUTIING_KEY,"商家账号通过审核");

        Map<String,Object> map = new HashMap<>();
        map.put("code",0);
        map.put("msg","账号审核通过,请10秒内上传1个商品");
        return map;
    }

消费者

@Component
@RabbitListener(queues = "lock_merchant_dead_queue")
public class MerchantMQListener {
    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {

        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("body="+body);
        //做复杂业务逻辑  TODO 商家业务

        //告诉broker,消息已经被确认
        channel.basicAck(msgTag,false);

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/563408.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

shopee虾皮跨境电商网站商品数据支持网站后缀(.com.my;.vn;.ph)

作为一名技术爱好者&#xff0c;我们总会遇到各种各样的技术问题&#xff0c;需要寻找合适的技术解决方案。而在互联网时代&#xff0c;我们可以快速通过搜索引擎获取丰富的技术资源和解决方案。然而&#xff0c;在不同的技术分享中&#xff0c;我们常常会遇到质量参差不齐的文…

【新星计划·2023】单臂路由的原理讲解

单臂路由是指在路由器的一个接口上通过配置子接口的方式&#xff0c;实现原来互相隔离的VLAN之间可以互相通信。 一、单臂路由概述 网络中通过VLAN技术来实现隔离广播、方便管理及提高安全性等功能&#xff0c;一旦划分VLAN后&#xff0c;同—VLAN之间可以相互通信&#xff0…

【统计模型】瑞典生育率现状与影响因素分析

目录 瑞典生育率现状与影响因素分析 一、研究目的 二、数据来源和相关说明 三、描述性分析 3.1 样本描述 3.2 数据可视化 四、数学建模 4.1 模型建立 4.2 模型结果 &#xff08;1&#xff09;全模型A &#xff08;2&#xff09;全模型B &#xff08;3&#xff09;全…

传奇手游三职业1.80合击服务端三端互通版搭建教程

传奇手游三职业1.80合击服务端三端互通版搭建教程 大家好&#xff0c;我是驰网艾西。随着时代的发展&#xff0c;以前我们热爱的传奇游戏也越来越没有时间玩了&#xff0c;到了一定的年纪大家都有自己的事业以及生活压力。以前我们总是玩PC端所谓的端游&#xff0c;现在大家都…

highcharts矢量图放在图表的最上方

将矢量图对应的y轴的top和height都设置为0 即可 下面红色标注全是y轴的设置 以上这中图怎么实现 其中top是指图表中每个模块的位置&#xff0c;offset表示偏移的位置&#xff0c;height表示每个模块占据整个图标的高度的百分比&#xff0c;opposite表示该y轴是否在右侧&#xf…

麒麟系统安装HDP【已解决】

麒麟系统安装HDP 麒麟系统安装HDP1、软件版本介绍2、文件替换3 报错解决3.1 解决KeyError: HDP-3.1&#xff08;所有机器&#xff09;3.2 安装smartsense-hst&#xff08;所有机器&#xff09;3.3 解决Non-ASCII character \xe5 in file&#xff08;所有机器&#xff09;3.4 解…

如何解决端口号被占用的方法

在学习JavaWeb的过程中&#xff0c;在运行代码的时候经常会提示端口号被占用的情况&#xff1b;出现这情况的主要原因就是没有正常关闭tomcat。 那么遇到这种情况应该怎么解决呢&#xff1f; 首先第一种方式就是把电脑关机重启&#xff0c;这种方法可谓是百试百灵&#xff1b;另…

分类逻辑回归实例一

一、实例背景 假设根据【推荐分值】来对推荐者类型进行分类&#xff1a;高推荐、中推荐、低推荐 二、任务目标 训练出一个模型&#xff0c;来实现根据【推荐分值】&#xff0c;来预测【推荐类型】的分类 三、机器学习实现 1. 核心步骤 实现全流程&#xff1a; 1. 1 建立…

Niagara—— Events and Event Handlers

目录 一&#xff0c;Events 二&#xff0c;Event Handlers 多数情况下&#xff0c;系统中的发射器需相互交互&#xff0c;才能创建所需效果&#xff1b;意味着&#xff0c;一发射器生成数据&#xff0c;另一发射器监听该数据&#xff0c;以执行相应行为&#xff1b;在Niagar…

Linux-初学者系列_docker

目录 Linux-初学者系列_docker一、概念二、安装docker&#xff08;可忽略 跳到第三步&#xff09;三、获取镜像1、下载nginx镜像2、查看本地镜像3、将镜像运行成一个容器01-查看运行的容器02-通过ip端口号访问03-删除端口04-指定镜像名字 4、dockerfile构建镜像5、dockersave构…

HLS入门实践

HLS入门实践 文章目录 HLS入门实践1.HLS基本知识简述1.1 HLS简介1.2 HLS相关知识概念 2. HLS技术认识2.1 与VHDL/Verilog关系2.2 关键技术问题2.3 存在的技术局限性 3. HLS 完成 led 灯闪烁3.1. 新建一个 HLS 工程3.2 添加源文件3.2.1 led.h3.2.2 led.cpp3.2.3 添加 C 仿真文件…

<Linux开发>驱动开发 -之-pinctrl子系统

&#xff1c;Linux开发&#xff1e;驱动开发 -之-pinctrl子系统 交叉编译环境搭建&#xff1a; &#xff1c;Linux开发&#xff1e; linux开发工具-之-交叉编译环境搭建 uboot移植可参考以下&#xff1a; &#xff1c;Linux开发&#xff1e; -之-系统移植 uboot移植过程详细记…

Zabbix 2.0 实验

zabbix自动发现与自动注册 ---------------nginx自动监控报警------------------- 在/etc/nginx/conf.d/default.conf 文件内添加 location /nginx_status {stub_status on;access_log off;allow 127.0.0.1;deny all;}curl -s http://127.0.0.1/nginx_status curl -s http:/…

卷麻了,面试了一个00后,绝对能称为是卷王之王....

公司前段缺人&#xff0c;也面了不少测试&#xff0c;结果竟然没有一个合适的。一开始瞄准的就是中级的水准&#xff0c;也没指望来大牛&#xff0c;提供的薪资也不低&#xff0c;面试的人很多&#xff0c;但平均水平很让人失望。令我印象最深的是一个00后测试员&#xff0c;他…

数据链路层:Ethernet以太网协议

首先Ethernet、IEEE802.3、PPP和HDLC都是数据链路层的协议&#xff0c;只不过后面三个不常用而已。Ethernet和IEEE802.3属于以太链路层协议&#xff0c;数据链路层最常用的协议是Etnernet以太网协议。 定义&#xff1a; Ethernet以太网协议&#xff0c;用于实现链路层的数据传…

Build History

ISO 登录https://next.itellyou.cn/Original/ Window 盛千装机助手https://wwza.lanzouo.com/s/SQZJ Ubuntu Windows / Linux —— U盘启动盘制作 - 知乎准备工作&#xff1a;系统镜像&#xff1a; Windows 10 / Windows 11 & Manjaro / Ubuntu &#xff08;自行选择下载…

MySQL主从同步(开GTID)

目录 一、搭建简单的主从同步 二、mysql删除主从&#xff08;若没有配置过可以不用进行这一步&#xff09; 1、停止slave服务器的主从同步 2、重置master服务 三、开启GTID 1、Master配置 2、Slave配置 一、搭建简单的主从同步 GTID原理&#xff1a;http://t.csdn.cn/g…

3.36 haas506 2.0开发教程-example -OLED显示生成二维码(python)

OLED显示生成二维码-python 应用场景案例说明1.OLED显示规则2.硬件3.连线图 代码源码链接 应用场景 二维码在各个领域中的应用越来越广泛&#xff0c;其中一些主要应用场景包括&#xff1a; 电子商务&#xff1a;通过二维码&#xff0c;用户可以轻松链接到商家的网站&#xff…

3年前的我废人一个,庆幸当时入了软件测试这行

为什么会学习软件测试&#xff1f; 已经28岁了&#xff0c;算一下快过去3年了&#xff0c;刚毕业那会工作了一年&#xff0c;因为自己当时很迷茫&#xff08;觉得自己挺废的&#xff09;&#xff0c;所以就没去工作就一直在家&#xff0c;家里固定每个月给点生活费&#xff0c…

Pytest模式执行python脚本不生成allure测试报告

1.安装allure 下载allure的zip安装包将allure.zip解压到python的lib目录中将allure的bin路径添加到环境变量path中(注意&#xff1a;配置环境变量后&#xff0c;一定要重启电脑。因为环境变量没生效&#xff0c;我搞了半天在pycharm不能生成报告&#xff0c;在cmd中可以生成报…