利用RabbitMQ实现消息投递削峰填谷

news2024/11/8 17:01:35

目录

异步和同步如何选择

异步线程 同步收发消息

一、导入依赖库

二、创建RabbitMQ配置类

三、创建消息任务类


异步和同步如何选择

·依靠多线程,Java代码可以同步执行也可以异步执行

·RabbitMQ提供了同步和异步两种收发消息模式

·我们采用 Java异步线程 MQ同步收发消息

异步线程 同步收发消息

一、导入依赖库

在 pom.xml 文件中添加RabbitMQ的依赖库 

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

二、创建RabbitMQ配置类

        连接 RabbitMQ 需要用到 ConnectionFactory ,所以我们要自己创建好 ConnectionFactory 对象然后注册给Spring框架,这就需要我们创建 RabbitMQConfig 类。 

@Configuration
public class RabbitMQConfig {
    @Bean
    public ConnectionFactory getFactory(){
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        return factory;
    }
}

三、创建消息任务类

        以前我们使用异步多线程的方式发送邮件,那么这次我们要创建的多线程任务类是用来收发RabbitMQ消息的,而且内部包含了同步执行和异步执行两种方式。

 

@Component
@Slf4j
public class MessageTask {
    @Autowired
    private ConnectionFactory factory;

    @Autowired
    private MessageService messageService;

    /**
     * 同步发送消息
     *
     * @param topic 主题
     * @param entity 消息对象
     */
    public void send(String topic, MessageEntity entity) {
        // 向MongoDB保存消息数据,返回消息ID
        String id = messageService.insertMessage(entity);

        // 向RabbitMQ发送消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel();
        ) {
            // 连接到某个Topic
            channel.queueDeclare(topic, true, false, false, null);

            // 存放属性数据
            HashMap map = new HashMap();
            map.put("messageId", id);

            // 创建AMQP协议参数对象,添加附加属性
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(map).build();
            channel.basicPublish("", topic, properties, entity.getMsg().getBytes());
            log.debug("消息发送成功");
        } catch (Exception e) {
            log.error("执行异常", e);
            throw new EmosException("向MQ发送消息失败");
        }
    }

    @Async
    public void sendAsync(String topic, MessageEntity entity) {
        send(topic, entity);
    }

    /**
     * 同步接收数据
     *
     * @param topic 主题
     * @return 接收消息数量
     */
    public int receive(String topic) {
        int i = 0;

        // 接收消息数据
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel();
        ) {
            // 从队列中获取消息,不自动确认
            channel.queueDeclare(topic, true, false, false, null);
            // Topic中有多少条数据未知,所以使用死循环接收数据,直到接收不到消息,退出死循环
            while (true) {
                // 创建响应接收数据,禁止自动发送Ack应答
                GetResponse response = channel.basicGet(topic, false);
                if (response != null) {
                    AMQP.BasicProperties properties = response.getProps();

                    // 获取附加属性对象
                    Map<String, Object> map = properties.getHeaders();
                    String messageId = map.get("messageId").toString();
                    // 获取消息正文
                    byte[] body = response.getBody();
                    String message = new String(body);
                    log.debug("从RabbitMQ接收的消息:" + message);

                    MessageRefEntity entity = new MessageRefEntity();
                    entity.setMessageId(messageId);
                    entity.setReceiverId(Integer.parseInt(topic));
                    entity.setReadFlag(false);
                    entity.setLastFlag(true);
                    
                    // 把消息存储在MongoDB中
                    messageService.insertRef(entity);
                    // 数据保存到MongoDB后,才发送Ack应答,让Topic删除这条消息
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                    i++;
                }
                else {
                    // 接收不到消息,则退出死循环
                    break;
                }
            }
        } catch (Exception e) {
            log.error("执行异常", e);
            throw new EmosException("接收消息失败");
        }
        return i;
    }

    @Async
    public int receiveAsync(String topic) {
        return receive(topic);
    }

    /**
     * 同步删除消息队列
     *
     * @param topic 主题
     */
    public void deleteQueue(String topic){
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel();
        ) {
            channel.queueDelete(topic);
            log.debug("消息队列成功删除");
        }catch (Exception e) {
            log.error("删除队列失败", e);
            throw new EmosException("删除队列失败");
        }
    }

    @Async
    public void deleteQueueAsync(String topic){
        deleteQueue(topic);
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/709767.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MongoRepository

一、介绍 MongoRepository是一个接口,与HibernateRepository类似,通过继承MongoRepository接口,我们可以非常方便地实现对一个MongoDB集合中的文档数据进行增删改查,示例如下所示: import org.bson.types.ObjectId; import org.springframework.data.mongodb.repository…

[数据结构 -- 手撕排序第一篇] 插入排序

目录 1、常见的排序算法 2、插入排序的思路 2.1 基本思想 2.2 直接插入排序 2.2.1 单趟排序的思路 2.2.2 单趟排序代码实现 3、插入排序代码 4、插入排序打印测试 5、插入排序的时间复杂度 5.1 最坏情况 5.2 最好情况 6、直接插入排序的特性总结 1、常见的排序算法 2、插入排序…

SpringMVC (二) 第一个MVC程序

学习回顾&#xff1a;SpringMVC &#xff08;一&#xff09; 什么是SpringMVC Hello&#xff0c;SpringMVC 现在我们来看看如何快速使用SpringMVC编写我们的程序吧&#xff01; 一、配置版 1、新建一个Moudle &#xff0c; springmvc-02-hello &#xff0c; 添加web的支持&…

Spring面试题--AOP

什么是AOP&#xff0c;你们项目中有没有使用到AOP&#xff1f; AOP称为面向切面编程&#xff0c;用于将那些与业务无关&#xff0c;但却对多个对象产生影响的公共行为和逻辑&#xff0c;抽取并封装为一个可重用的模块&#xff0c;这个模块被命名为“切面”&#xff08;Aspect&a…

2024中山大学898水文地质与工程地质考研初试复习资料

C8260153[电子书]2024年中山大学898水文地质与工程地质考研精品资料 说明&#xff1a;本套资料由高分研究生潜心整理编写&#xff0c;高清电子书&#xff0c;考研推荐资料。 一、考研真题及重点名校真题 1.附赠重点名校真题 ①重点名校&#xff1a;水文地质学基础2010-2013…

魏副业而战:抖音图文带货玩法,月入5w+

我是魏哥&#xff0c;与其在家躺平&#xff0c;不如魏副业而战&#xff01; 最近魏哥一直在研究短视频带货&#xff0c;看了很多案例&#xff0c;发现了一些NB的账号。 说真的&#xff0c;视频带货真的可以认真的研究研究&#xff0c;不管是做直播&#xff0c;还是发视频&…

【C++11】左值引用 与 右值引用

定义 左值 / 左值引用 左值&#xff08;Lvalue&#xff09;&#xff1a; 左值是一个表示数据的表达式(如变量名或解引用的指针)&#xff0c;我们可以 对它取地址 可以对它赋值&#xff0c;左值可以出现赋值符号的左边&#xff0c;右值不能出现在赋值符号左边。定义时const修…

青少年机器人技术一级核心知识点:机械结构及模型(二)

随着科技的不断进步&#xff0c;机器人技术已经成为了一个重要的领域。在这个领域中&#xff0c;机械结构是机器人设计中至关重要的一部分&#xff0c;它决定了机器人的形态、运动方式和工作效率。对于青少年机器人爱好者来说&#xff0c;了解机械结构的基础知识&#xff0c;掌…

持续集成工具Jenkins安装和部署

前言 Jenkins的执行流程图如下&#xff1a; 1. 前期准备 1.1 安装JDK 目前新版本的Jenkins对JDK的要求基本上都在JDK11以上&#xff0c;所以我这边将我服务器的JDK版本升级成为JDK11。 具体升级步骤如下&#xff1a; 下载安装包 官网可能需要注册账号&#xff0c;这里我…

微信:如何查询自己名下已实名认证绑定的几个微信账户?

你知道如何查询自己名下已实名认证绑定了几个微信账户吗&#xff1f;微信规则同一个人最多可以注册绑定完成5个微信账户认证&#xff0c;如果想注册新微信号&#xff0c;必须保证实名认证微信账户不足5个。而且通过查询自己名下实名认证微信账户还可以确认&#xff0c;自己的身…

LINUX系统(ubuntu)安装以及应用调试(不定时更新)

一&#xff1a;linux的介绍 Linux是一种基于UNIX操作系统的开源&#xff08;Open Source&#xff09;操作系统。它由芬兰计算机科学家 Linus Torvalds 在1991年首次发布&#xff0c;目前已经发展成为最流行和广泛使用的操作系统之一。 Linux以其稳定性、安全性和灵活性而闻名…

07-C++学习笔记-函数

&#x1f4da; 函数的概念 函数是一段可重复使用的代码块&#xff0c;用于完成特定的任务。通过使用函数&#xff0c;可以将程序划分为多个模块&#xff0c;提高代码的可读性、可维护性和复用性。 在C中&#xff0c;函数由函数头和函数体组成。函数头包含函数的返回类型、函数…

日期格式化不起作用 2022-09-18T05:25:30.000+00:00

java->web JsonFormat(pattern “yyyy-MM-dd HH:mm:ss”)一般版本问题或依赖冲突不起作用 解决&#xff1a; spring:jackson:serialization:write-dates-as-timestamps: falsedate-format: yyyy-MM-dd HH:mm:ss这个配置会在java对象传输给web前端的时候对日期的字段进行…

Linux 查看端口占用命令

文章目录 1、lsof -i:端口号2、netstat 命令2.1 netstat -tunlp 命令2.2 netstat -anp 命令 1、lsof -i:端口号 用于查看某一端口的占用情况&#xff0c;比如查看5000端口使用情况&#xff1a; lsof -i:5000常用命令&#xff1a; lsof -i:5000&#xff1a;查看5000端口占用 …

QT或VS2015报错:Error: C2661: QColor::ct::ct: 没有重载函数接受 5 个参数解决方案

安装了QT5.14.2 MSCV2015配置并同时在QT或VS2015测试并运行都提示没有重载函数接受 5 个参数。 同时还会出现C2134错误&#xff1a;QMetaObject::SuperData::operator const QMetaObject *: 调用不会生成常数表达式的错误 搜索了网络上的结果都让换其它版本&#xff0c;没有…

高压线路零序电流方向保护程序逻辑原理(一)

一、微机型零序电流方向保护概念 &#xff08;一&#xff09;保护电流元件的配置 零序电流方向保护是反应大接地电流系统的线路发生接地故障时&#xff0c;零序电流分量大小和方向的多段式电流方向保护。在我国大接地电流系统线路上都装设了这种接地保护装置&#xff0c;这种保…

(推荐)Abaqus中C++子程序开发入门

ABAQUS是支持C子程序开发的&#xff0c;相比于传统的Fortran&#xff0c;C作为高级语言的优势不言而喻&#xff0c;再搭配优秀的C程序库&#xff0c;使得我们的编程效率大大提高&#xff0c;尤其是对于熟悉C编程的开发者&#xff0c;不失为一种更好的选择。 1 软件配置 1.1 环…

20230702 正态分布的几个性质

正态分布以及高斯函数的定义 如果随机变量 X X X 的密度函数为 f μ , σ ( x ) 1 σ 2 π e − ( x − μ ) 2 2 σ 2 , x ∈ R , σ > 0 f_{\mu, \sigma}(x)\frac{1}{\sigma \sqrt{2 \pi}} e^{-\dfrac{(x-\mu)^2}{2 \sigma^2}}, \quad x \in \mathbb{R}, \sigma>0 …

the size of an array must be greater than zero

keil默认不支持数组定义的情况buf[0]

Pytorch深度强化学习1-2:详解K摇臂赌博机模型和ϵ-贪心算法

目录 0 专栏介绍1 K-摇臂赌博机2 ϵ \epsilon ϵ-贪心算法3 softmax算法4 Python实现与分析 0 专栏介绍 本专栏重点介绍强化学习技术的数学原理&#xff0c;并且采用Pytorch框架对常见的强化学习算法、案例进行实现&#xff0c;帮助读者理解并快速上手开发。同时&#xff0c;…