RabbitMQ的安装使用

news2025/1/12 8:44:19

RabbitMQ是什么?

MQ全称为Message Queue,消息队列,在程序之间发送消息来通信,而不是通过彼此调用通信。
RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

为什么使用RabbitMQ?

优点:
1、实现应用系统的解耦,客户端只关心发送消息,而不关心处理。
2、异步提升效率,在主业务逻辑发送消息,异步去处理消息
3、流量削峰,将请求放到mq消息队列中,mysql每秒去拉取请求消费,避免请求全部一下子全部打到mysql,请求过多而崩溃

怎么使用RabbitMQ?

1.安装windows的客户端,参考链接3

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2.java 代码引入相关jar包
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-amqp</artifactId>
			<version>2.2.3.RELEASE</version>
		</dependency>
3.编写发送,接收消息的工具类
延迟队列配置
package com.next.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @desc 延迟队列配置
 */
@Configuration
public class RabbitDelayMqConfig {

    @Bean("delayDirectExchange")
    public DirectExchange delayDirectExchange() {
        DirectExchange directExchange = new DirectExchange(QueueConstants.DELAY_EXCHANGE, true, false);
        //交换机开启延迟设置true,延迟才会生效
        directExchange.setDelayed(true);
        return directExchange;
    }

    @Bean("delayNotifyQueue")
    public Queue delayNotifyQueue() {
        return new Queue(QueueConstants.DELAY_QUEUE);
    }

    @Bean("delayBindingNotify")
    public Binding delayBindingNotify(@Qualifier("delayDirectExchange") DirectExchange delayDirectExchange,
                                 @Qualifier("delayNotifyQueue") Queue delayNotifyQueue) {
        return BindingBuilder.bind(delayNotifyQueue).to(delayDirectExchange).with(QueueConstants.DELAY_ROUTING);
    }
}

队列配置
package com.next.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * @desc 队列配置
 */
@Configuration
public class RabbitMqConfig {

    @Bean("directExchange")
    @Primary
    public DirectExchange directExchange() {
        return new DirectExchange(QueueConstants.COMMON_EXCHANGE, true, false);
    }

    @Bean("notifyQueue")
    @Primary
    public Queue notifyQueue() {
        return new Queue(QueueConstants.COMMON_QUEUE);
    }

    @Bean("bindingNotify")
    @Primary
    public Binding bindingNotify(@Qualifier("directExchange") DirectExchange directExchange,
                                 @Qualifier("notifyQueue") Queue notifyQueue) {
        return BindingBuilder.bind(notifyQueue).to(directExchange).with(QueueConstants.COMMON_ROUTING);
    }
}

发送消息工具类
package com.next.mq;

import com.next.util.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @desc 客户端工具类 -- 发送消息
 */
@Component
@Slf4j
public class RabbitMqClient {

    @Resource
    private RabbitTemplate rabbitTemplate;

    //发送同步消息
    public void send(MessageBody messageBody) {
        try {
            //生成唯一的消息id
            String uuid = UUID.randomUUID().toString();
            //初始话消息
            CorrelationData correlationData = new CorrelationData(uuid);
            //使用模板工具类rabbitTemplate 来发消息
            rabbitTemplate.convertAndSend(QueueConstants.COMMON_EXCHANGE, QueueConstants.COMMON_ROUTING,
                    JsonMapper.obj2String(messageBody), new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            // 消息持久化
                            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                            //记录日志
                            log.info("message send, {}", message);
                            return message;
                        }
                    }, correlationData);
        } catch (Exception e) {
            //日志打印,以便定位问题
            log.error("message send exception, msg:{}", messageBody.toString(), e);
        }
    }


    /**
     * @desc 发送延迟消息
     */
    public void sendDelay(MessageBody messageBody, int delayMillSeconds) {
        try {
            //设置消息延迟时间
            messageBody.setDelay(delayMillSeconds);
            String uuid = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(uuid);
            //延迟交换机和路由
            rabbitTemplate.convertAndSend(QueueConstants.DELAY_EXCHANGE, QueueConstants.DELAY_ROUTING,
                    JsonMapper.obj2String(messageBody), new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 消息持久化
                            //设置消息延迟的时间(毫秒值)
                            message.getMessageProperties().setDelay(delayMillSeconds);
                            log.info("delay message send, {}", message);
                            return message;
                        }
                    }, correlationData);
        } catch (Exception e) {
            log.error("delay message send exception, msg:{}", messageBody.toString(), e);
        }
    }
}

接收消息工具类
package com.next.mq;

import com.next.dto.RollbackSeatDto;
import com.next.model.TrainOrder;
import com.next.service.TrainOrderService;
import com.next.service.TrainSeatService;
import com.next.util.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.type.TypeReference;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @desc rabbitmq的server端 - 延迟接收消息
 * 用处:在主流程里面发送消息,异步流程里面接收消息,处理。提升代码性能
 */

@Component
@Slf4j
public class RabbitDelayMqServer {

    @Resource
    private TrainSeatService trainSeatService;
    @Resource
    private TrainOrderService trainOrderService;

    @RabbitListener(queues = QueueConstants.DELAY_QUEUE)
    public void receive(String message) {
        log.info("delay queue receive message, {}", message);

        try {
            MessageBody messageBody = JsonMapper.string2Obj(message, new TypeReference<MessageBody>() {
            });

            if (messageBody == null) {
                return;
            }

            switch (messageBody.getTopic()) {
                case QueueTopic.SEAT_PLACE_ROLLBACK:
                    RollbackSeatDto dto = JsonMapper.string2Obj(messageBody.getDetail(), new TypeReference<RollbackSeatDto>() {
                    });
                    trainSeatService.batchRollbackSeat(dto.getTrainSeat(), dto.getFromStationIdList(), messageBody.getDelay());
                    break;
                case QueueTopic.ORDER_PAY_DELAY_CHECK:
                    TrainOrder trainOrder = JsonMapper.string2Obj(messageBody.getDetail(), new TypeReference<TrainOrder>() {
                    });
                    trainOrderService.delayCheckOrder(trainOrder);
                    break;
                default:
                    log.warn("delay queue receive message, {}, no need handle", message);
            }
        } catch (Exception e) {
            log.error("delay queue message handle exception, msg:{}", message, e);
        }
    }
}

参考链接:
1.rabbitMQ到底是个啥东西?
2.超详细!!!Windows下安装RabbitMQ的步骤详解
3.windows安装rabbitmq和环境erlang(最详细版,包括对应关系,安装错误解决方法)
4.RabbitMQ安装或启动后,无法访问http://localhost:15672/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1388074.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

图解结算平台:准确高效给商户结款

这是《百图解码支付系统设计与实现》专栏系列文章中的第&#xff08;4&#xff09;篇。 本章主要讲清楚支付系统中商户结算涉及的基本概念&#xff0c;产品架构、系统架构&#xff0c;以及一些核心的流程和相关领域模型、状态机设计等。 1. 前言 收单结算是支付系统最重要的子…

曲面上偏移命令的查找

今天学习老王的SW绘图时&#xff0c;遇到一个命令找不到&#xff0c;查询了一会终于找到了这个命令&#xff0c;防止自己忘记&#xff0c;特此记录一下&#xff0c;这个命令就是“曲面上偏移”&#xff0c;网上好多的教程都是错误的&#xff0c;实际上这个命令没有在曲面里面&a…

蓝桥杯备赛 | 洛谷做题打卡day3

蓝桥杯备赛 | 洛谷做题打卡day3 sort函数真的很厉害&#xff01; 文章目录 蓝桥杯备赛 | 洛谷做题打卡day3sort函数真的很厉害&#xff01;【深基9.例1】选举学生会题目描述输入格式输出格式样例 #1样例输入 #1 样例输出 #1 我的一些话 【深基9.例1】选举学生会 题目描述 学校…

封装日期时间组件

概述 该组件包含日期选择&#xff0c;任意时间选择、固定时间点选择。 子组件代码(date-picker.vue) <template><div id"date_picker"><el-popover placement"top" width"322" trigger"click" ref"popover&quo…

【学习心得】图解Git命令

图解Git命令的图片是在Windows操作系统中的Git Bash里操作截图。关于Git的下载安装和理论学习大家可以先看看我写的另两篇文章。链接我放在下面啦&#xff1a; 【学习心得】Git快速上手_git学习心得-CSDN博客 【学习心得】Git深入学习-CSDN博客 一、初始化仓库 命令&#xff…

Go后端开发 -- 数组 slice map range

Go后端开发 – 数组 && slice && map && range 文章目录 Go后端开发 -- 数组 && slice && map && range一、数组1.数组的声明和初始化2.数组的传参 二、slice切片1.slice的定义和初始化2.len()和cap()函数3.空切片4.切片截取5…

基于深度学习的实例分割的Web应用

基于深度学习的实例分割的Web应用 1. 项目简介1.1 模型部署1.2 Web应用 2. Web前端开发3. Web后端开发4. 总结 1. 项目简介 这是一个基于深度学习的实例分割Web应用的项目介绍。该项目使用PaddlePaddle框架&#xff0c;并以PaddleSeg训练的图像分割模型为例。 1.1 模型部署 …

智能反射面—流形优化

使用Manopt工具箱适合优化最小化问题&#xff0c;如果你的优化问题是最大化问题&#xff0c;那么需要将其转换为最小化问题然后使用Manopt工具箱求解。 具体安装过程 Matlab添加Manopt - 知乎 (zhihu.com) 优化问题 clc,clear; close all; srng(1);%rand seed N10; GR_num1e3…

MathType中文网站2024最新版本下载及嵌入word教程

MathType是一款专业的数学公式编辑器,兼容Office word,excel等700多种程序,用于编辑数学试卷、书籍、报刊、论文、幻灯演示等文档轻松输入各种复杂的数学公式和符号。 MathType是一款功能强大的数学公式编辑器&#xff0c;广泛用于编写和编辑数学公式。Word是微软公司推出的文…

C语言练习day6

关于牛客网运行超时的问题 [NOIP2008]ISBN号码_牛客题霸_牛客网 题目 思路&#xff1a;这个题目重点在怎么去把这个ISBN号码正确输入&#xff0c;其实这个题目已经提示了我们一点信息&#xff1a;输入描述里说&#xff0c;是一个字符序列&#xff0c;其实我们就可以把这个IS…

中国社科院与新加坡社科大联合培养博士——单证还是双证?

有关博士学位&#xff0c;我想不用多说相信很多人都清楚&#xff0c;博士是我国学位等级中目前为止的最高学位&#xff0c;拥有了博士学位就相当于拥有了最高荣誉&#xff0c;但是&#xff0c;我国教育形式另开设了学历教育&#xff0c;对于学历教育的形式&#xff0c;在职博士…

MIT 6s081 lab1:Xv6 and Unix utilities

Lab1: Xv6 and Unix utilities 作业网址&#xff1a;https://pdos.csail.mit.edu/6.828/2020/labs/util.html Boot xv6(easy) 下载&#xff0c;启动xv6系统 $ git clone git://g.csail.mit.edu/xv6-labs-2020 Cloning into xv6-labs-2020... ... $ cd xv6-labs-2020 $ git …

Maxwell数据同步(增量)

1. Maxwell简介 1.1 Maxwell概述 Maxwell 是由美国Zendesk公司开源&#xff0c;用Java编写的MySQL变更数据抓取软件。它会实时监控Mysql数据库的数据变更操作&#xff08;包括insert、update、delete&#xff09;&#xff0c;并将变更数据以 JSON 格式发送给 Kafka、Kinesi等流…

浅谈智慧路灯安全智能供电方案设计

摘要: 智慧路灯&#xff0c;作为智慧城市、新基建、城市更新的主要组成部分&#xff0c;近些年在各大城市已得到很好的落地和 应用&#xff0c;但其与传统路灯相比集成大量异元异构电子设备&#xff0c;这些设备的供电电压、接口形式、权属单位各不相同&#xff0c; 如何设计一…

《绝地求生》职业选手画面设置推荐 绝地求生画面怎么设置最好

《绝地求生》画面怎么设置最好是很多玩家心中的疑问&#xff0c;如果性能不是问题无疑高特效显示效果更好&#xff0c;但并不是所有画面参数都利于战斗&#xff0c;今天闲游盒带来分享的《绝地求生》职业选手画面设置推荐&#xff0c;赶紧来看看吧。 当前PUBG的图像设置的重要性…

YOLOv5改进 | 主干篇 | 12月份最新成果TransNeXt特征提取网络(全网首发)

一、本文介绍 本文给大家带来的改进机制是TransNeXt特征提取网络,其发表于2023年的12月份是一个最新最前沿的网络模型&#xff0c;将其应用在我们的特征提取网络来提取特征&#xff0c;同时本文给大家解决其自带的一个报错&#xff0c;通过结合聚合的像素聚焦注意力和卷积GLU&…

1131. 拯救大兵瑞恩(dp思想运用,set)

1131. 拯救大兵瑞恩 - AcWing题库 1944 年&#xff0c;特种兵麦克接到国防部的命令&#xff0c;要求立即赶赴太平洋上的一个孤岛&#xff0c;营救被敌军俘虏的大兵瑞恩。 瑞恩被关押在一个迷宫里&#xff0c;迷宫地形复杂&#xff0c;但幸好麦克得到了迷宫的地形图。 迷宫的…

MySQL(三)——函数

上期文章 MySQL&#xff08;二&#xff09;——SQL 文章目录 上期文章字符串函数数值函数日期函数流程函数总结 函数&#xff1a;一段可以直接被另一段程序调用的程序或代码 字符串函数 函数功能CONCAT(S1,S2,…Sn)字符串拼接&#xff0c;将S1,S2,…Sn拼接成一个字符串LOWER…

分布式光伏运维平台在提高光伏电站发电效率解决方案

摘要&#xff1a;伴随着能源危机和环境恶化问题的日益加重&#xff0c;科技工作者进一步加大对新能源的开发和利用。太阳能光伏发电作为新型清洁能源的主力军&#xff0c;在实际生产生活中得到了广泛的应用。然而&#xff0c;光伏发电效率偏低&#xff0c;成为制约光伏发电发展…