Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息

news2025/1/23 22:37:50

1,版本说明

erlang 和 rabbitmq 版本说明
https://www.rabbitmq.com/which-erlang.html
确认需要安装的mq版本以及对应的erlang版本。

2,下载安装文件

RabbitMQ下载地址:
https://packagecloud.io/rabbitmq/rabbitmq-server

Erlang下载地址:
https://packagecloud.io/rabbitmq/erlang

RabbitMQ延迟消息插件下载
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

下载文件如图

在这里插入图片描述

3,安装步骤

3.1, 查询是否有安装过erlang、rabbitmq, 查询到有的话需要删除。

	rpm -qa | grep rabbitmq-server
	rpm -qa | grep erlang
	# 删除
	yum -y remove rabbitmq-server.noarch

3.2, 本地安装erlang

	yum localinstall erlang-23.2.7-2.el7.x86_64.rpm
	# 查询安装的版本
	erl -version
	# Erlang (SMP,ASYNC_THREADS,HIPE) (BEAM) emulator version xxx

3.3, 本地安装rabbitmq

	yum localinstall rabbitmq-server-3.9.0-1.el7.noarch.rpm
	# 启动rabbitmq
	systemctl start rabbitmq-server

	# 查看rabbitmq状态
	systemctl status rabbitmq-server

	# 设置rabbitmq服务开机自启动
	systemctl enable rabbitmq-server

	# 关闭rabbitmq服务
	systemctl stop rabbitmq-server

	# 重启rabbitmq服务
	systemctl restart rabbitmq-server

3.4, mq 端口开放:

	firewall-cmd --zone=public --add-port=5672/tcp --permanent
	firewall-cmd --zone=public --add-port=15672/tcp --permanent
	firewall-cmd --reload
	firewall-cmd --zone=public --list-ports

3.5, 安装mq管理界面

	
	# 启用管理界面插件
	rabbitmq-plugins enable rabbitmq_management

	curl http://localhost:15672 就可以打开web管理页面

	# rabbitmq有一个默认的账号密码guest,但该情况仅限于本机localhost进行访问,所以需要添加一个远程登录的用户

	# 添加用户
	rabbitmqctl add_user 用户名 密码

	rabbitmqctl add_user admin 123456

	# 设置用户角色,分配操作权限
	rabbitmqctl set_user_tags 用户名 角色

	rabbitmqctl set_user_tags admin administrator

	# 为用户添加资源权限(授予访问虚拟机根节点的所有权限)
	rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"

	rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

	# 角色有四种:
	# administrator:可以登录控制台、查看所有信息、并对rabbitmq进行管理
	# monToring:监控者;登录控制台,查看所有信息
	# policymaker:策略制定者;登录控制台指定策略
	# managment:普通管理员;登录控制

	# 修改密码
	rabbitmqctl change_ password 用户名 新密码

	# 删除用户
	rabbitmqctl delete_user 用户名

	# 查看用户清单
	rabbitmqctl list_users

3.6, 延迟消息插件安装:

    # 把插件包先复制到	 /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins
    cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins/
	rabbitmq-plugins enable rabbitmq_delayed_message_exchange
	#重启mq		
	systemctl restart rabbitmq-server
	rabbitmq-plugins list

3.7,登录测试

访问地址: ip:15672 账号密码: admin 123456
登录界面

找到交换机 exchange,看看类型是否有延迟消息类型的
在这里插入图片描述

然后就可以写代码去连接发消息了。

4, Java代码

4.1, pom 引入:

         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

4.2, 配置类:

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

}

4.3, 消息定义配置类:


import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class OrderRabbitMQConfig {

    @Autowired
    private RabbitAdmin rabbitAdmin;
    //================================订单延时=================================
    @Bean
    CustomExchange order_pay_delay_exchange() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("order_pay_delay_exchange", "x-delayed-message", true, false, args);
    }
    @Bean
    public Queue order_pay_delay_queue() {
        Queue queue = new Queue("order_pay_delay_queue", true, false, false);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }
    @Bean
    public Binding order_pay_delay_binding() {
        return BindingBuilder.bind(order_pay_delay_queue())
                .to(order_pay_delay_exchange()).with("order_pay_delay_routing").noargs();
    }

    //================================订单支付通知======================================
    @Bean
    public DirectExchange order_pay_notify_exchange() {
        return new DirectExchange("order_pay_notify_exchange", true, false);
    }
    @Bean
    public Queue order_pay_notify_direct_queue() {
        Map<String, Object> argsMap = new HashMap<>();
        argsMap.put("x-max-priority", 5);
        Queue queue = new Queue("order_pay_notify_queue", true, false, false, argsMap);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }
    @Bean
    public Binding ctc_bidding_auction_pay_notify_binding() {
        return BindingBuilder.bind(order_pay_notify_direct_queue())
                .to(order_pay_notify_exchange()).with("order_pay_notify_routing");
    }
}

4.4, 消息发送类:


import cn.hutool.json.JSONUtil;
import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class RabbitMQSendUtils {

    private static RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQSendUtils(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * 订单支付延时通知、发送MQ消息
     */
    public static void sendPayDelayMessage(PayOrderNotifyDto dto, final Integer delayTimes) {
        //给延迟队列发送消息
        String msg = JSONUtil.toJsonStr(dto);
        log.info("订单支付延时通知、发送MQ消息: {}, delayTimes={}", msg, delayTimes);
        rabbitTemplate.convertAndSend("order_pay_delay_exchange", "order_pay_delay_routing", msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值
                message.getMessageProperties().setDelay(delayTimes);
                return message;
            }
        });
    }

    /**
     * 订单支付通知,发送MQ消息
     */
    public static void sendPayNotifyMsg(PayOrderNotifyDto dto) {
        log.info("订单支付通知,发送MQ消息: {}", dto);
        rabbitTemplate.convertAndSend("order_pay_notify_exchange", "order_pay_notify_routing", JSONUtil.toJsonStr(dto));
    }
}

4.5, 消息监听消费类:


import cn.hutool.json.JSONUtil;
import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * MQ消费监听
 */
@Slf4j
@Component
public class OrderMQListener {
    /**
     * 订单延时通知 消息
     */
    @RabbitListener(queues = {"order_pay_delay_queue"})
    public void payDelayNotify(Message message) {
        try {
            String msg = new String(message.getBody());
            log.info("【消费】订单延时通知 MQ 消息内容: {}, Message={}", msg, message);
            //支付订单改成超时未支付》取消
            PayOrderNotifyDto dto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);

        } catch (Exception e) {
            log.error("订单延时通知 消息消费失败:", e);
        }
    }
    /**
     * 订单支付通知 消息
     */
    @RabbitListener(queues = {"order_pay_notify_queue"})
    public void payNotify(Message message) {
        try {
            String msg = new String(message.getBody());
            log.info("订单支付通知 MQ 消息内容:{}, {}", msg, message);
            PayOrderNotifyDto payOrderNotifyDto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);
        } catch (Exception e) {
            log.error("订单支付通知 消息消费失败:", e);
        }
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1138528.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在微信小程序云开发中引入Vant Weapp组件库

介绍 Vant 是一个轻量、可靠的移动端组件库&#xff0c;于 2017 年开源。 目前 Vant 官方提供了 Vue 2 版本、Vue 3 版本和微信小程序版本&#xff0c;并由社区团队维护 React 版本和支付宝小程序版本。 介绍 - Vant Weapp (youzan.github.io) Vant Weapp需要安装 node.js&…

nodejs+vue 电子书阅读系统

本文首先介绍了电子书阅读系统的发展背景与发展现状&#xff0c;然后遵循软件常规开发流程&#xff0c;首先针对系统选取适用的语言和开发平台&#xff0c;随着网络技术的不断发展&#xff0c;多媒体技术应用渐渐的出现在教育领域中&#xff0c;电子书阅读已经成为社会的一个热…

02-URL与资源

浏览​因特网资源 URI是一类更通用的资源标识符&#xff0c;URL与URN是它的子集URL通过描述资源的位置来标识资源HTTP规范将更通用的概念URI作为资源标识符&#xff0c;实际上&#xff0c;HTTP应用程序处理的知识URI的URL子集URL可以通过HTTP之外的其他协议来访问资源。 http链…

Unity C#中LuaTable、LuaArrayTable、LuaDictTable中数据的增删改查

LuaTable、LuaArrayTable、LuaDictTable中数据的增删改查 介绍Lua表lua表初始化lua移除引用lua中向表中添加数据lua中表中移除数据lua表中连接数据lua表中数据排序获取lua表长度获取表中最大值 UnityC#中LuaTableUnityC#中LuaArrayTable、LuaDictTable、LuaDictTable<K,V>…

HTTP发起请求与收到响应的大致过程

可以《《透视 HTTP 协议》Windows 10 搭建最小实验环境》搭建环境&#xff0c;之后才能进行下边的操作。 1.鼠标左键点击两下www目录下的start.bat批处理文件。 2.打开Wireshark&#xff0c;然后选择Adapter for loopback traffic capture。 3.然后把tcp.port 80 || udp.…

基于Springboot自习预约管理系统

功能介绍&#xff1a; 基于Springboot自习预约管理系统。该系统为后台管理系统&#xff0c;无前台。主要角色有&#xff1a;管理员和学生。 主要功能&#xff1a; 学生信息管理&#xff1a;学号、姓名、专业、班级、年级、联系方式 教室信息管理&#xff1a;教室、教室号、楼…

el-form那些事

vue3element-plus el-form那些事 输入框后拼接文字 输入框后拼接文字 <el-form-item :label"t(location.locationLength)" prop"locationLength"><el-input v-model"form.locationLength" :placeholder"t(location.inputLocation…

深度学习中语义分割、实例分割、目标检测和图像分类区别

语义分割 实例分割 目标检测 语义分割&#xff1a;需要判断每个像素属于哪一个类别&#xff0c;属于像素级别分类标注 实例分割&#xff1a;相较于语义分割 会将同一类别的不同物体进行分离标注 目标检测&#xff1a;输入图像通常包含多个物体&#xff0c;对物体的位置与类别进…

C# 图解教程 第5版 —— 第9章 表达式和运算符

文章目录 9.1 表达式&#xff08;*&#xff09;9.2 字面量9.2.1 整数字面量9.2.2 实数字面量9.2.3 字符字面量9.2.4 字符串字面量 9.3 求值顺序9.3.1 优先级9.3.2 结合性 9.4 简单算术运算符9.5 求余运算符9.6 关系比较运算符和相等比较运算符9.7 递增运算符和递减运算符&#…

Babylonjs学习笔记(五)——创建PBR材质

书接上回&#xff0c;这里讨论PBR材质&#xff01;&#xff01;&#xff01; // 创建天空盒/* */const createSkyBox (scene:Scene):void>{const envTex CubeTexture.CreateFromPrefilteredData(./env/environment.env,scene)scene.environmentTexture envTex;scene.cre…

HTTP 之 options预请求 nginx 解决跨域 postman调试跨域问题

一、HTTP一共有八种常见请求方法 get&#xff1a;参数在url上&#xff0c;浏览器长度有限制&#xff0c;不安全post&#xff1a;参数不可见&#xff0c;长度不受限制put&#xff1a;上传最新内容到指定位置delete&#xff1a;删除请求的url所表示的资源head&#xff1a;不返回…

Android stdio 无法新建或打开AIDL文件(解决方法)

1.在gradle文件中添加如下代码 2.AIDL要求minsdk>16,并且要使aidl true&#xff08;在Gradle中添加&#xff09; android{ buildFeatures { aidl true } } 我们看见&#xff0c;可以创建AIDL文件了 3.接着&#xff0c;我们看到文件出现如下提示 4.在gradle…

CGAL+QT

先安装CGAL和QT 安装完QT其中MSVC 这两个没配置 1、x32配置选择的是 x64配置选择的是 2、CGAL 5.4.5 - Manual: Using CGAL on Windows (with Visual C) 参数文章配置一些环境变量 3、 测试 新建build 进行cmake QT、Boost、CGAL都自动匹配上了&#xff08;环境变量已经配…

出租屋智能视频监控系统方案:全面保卫租客安全

除了我们常见的家庭、社区、园区等智能监控&#xff0c;出租房作为很多人的暂住所也极易发生盗窃等事件&#xff0c;为保障大众租户的财产安全&#xff0c;旭帆科技特地针对出租屋制定了智能监控系统方案。 1、安装智能安防摄像头 高清晰度、夜视功能良好的智能摄像头&#xf…

vue 复杂的流程图实现--antv/g6

可以先看下对应的文档&#xff1a;G6 Demos - AntV npm install --save antv/g6 实现如图&#xff1a; <template><div class"drawflow"><div id"mountNode"></div></div> </template> <script> import G6 fr…

FreeRTOS 互斥量 优先级反转(翻转)和优先级继承 详解

目录 什么是互斥量&#xff1f; 什么是优先级反转&#xff08;翻转&#xff09;和优先级继承 互斥量相关 API 函数 优先级反转&#xff08;翻转&#xff09;示例 使用互斥量优化优先级反转&#xff08;翻转&#xff09;问题示例 什么是互斥量&#xff1f; 在多数情况下&a…

Perl爬虫程序

以下是一个使用Perl爬虫程序&#xff0c;用于爬取图像。每行代码的中文解释如下&#xff1a; #!/usr/bin/perl ​ use strict; use warnings; use Mojo::UserAgent; use JSON; ​ # 创建一个Mojo::UserAgent实例 my $ua Mojo::UserAgent->new; ​ # 使用获取代理 my $prox…

【机器学习可解释性】2.特征重要性排列

机器学习可解释性 1.模型洞察的价值2.特征重要性排列3.偏依赖图 &#xff08; partial dependence plots &#xff09;4.SHAP Value5.SHAP Value 高级使用 正文 前言 你的模型认为哪些特征最重要&#xff1f; 介绍 我们可能会对模型提出的最基本的问题之一是&#xff1a;哪…

react的table合并行时,出现border-bottom重复问题

背景&#xff1a; 需求是呈现一个表格&#xff0c;根据操作人跟操作时间是否相同来进行合并行数据 数据结构&#xff1a; 经过跟后端的同事商量&#xff0c;需要在每一行数据中返回rowSpanNum的值&#xff0c;前端在column中根据值来判断是否满足合并行&#xff08;没有合并行…

在Go项目中封装AES加解密客户端接口

1.摘要 在一个中型以上的项目中, 我们一般会在项目工程中开辟一个pkg文件夹用来存放一些基础工具接口,比如:数据库、中间件、加解密算法、基础协议等等。在这篇文章中, 我主要分享一下在基于Go语言的项目中, 加解密算法中如何封装一个通用的加解密接口, 并以使用比较广泛的AES…