通过rabbitmq生成延时消息,并生成rabbitmq镜像

news2024/11/28 13:55:25

通过rabbitmq生成延时消息队列,并生成rabbitmq镜像

  • 整体描述
    • 1. 使用场景
    • 2. 目前问题
    • 3. 前期准备
  • 具体步骤
    • 1. 拉取镜像
    • 2. 运行镜像
    • 3. 安装插件
    • 4. 代码支持
      • 4.1 config文件
      • 4.2 消费监听
      • 4.2 消息生产
    • 5. 功能测试
  • 镜像操作
    • 1. 镜像制作
    • 2. 镜像导入
  • 总结

整体描述

1. 使用场景

在使用消息队列时,我们有时候需要生成一些延时消息,比如判断一个任务的开始时间,我在创建任务的时候计算出此时距离任务开始的时间,然后往消息队列里发送一个延时消息,我们希望等到任务开始的时候,再消费此消息,此时任务开始,可以进行一些业务上的操作。

2. 目前问题

之前写过一篇创建rabbitmq镜像的文章,链接: 在centos搭建rabbitmq并制作docker镜像,使用的rabbitmq的版本是3.6.8,只能通过过期时间expiration来设置消息的过期时间,在消息过期的时候,会进入死信队列中,也能达到上述要求。但是,但是,这个过期时间expiration,rabbitmq在处理的时候有个坑,前面消息如果没有过期,后面的消息就算过期了,也不会触发,就是先发的消息没有到期,之后再发的消息就算到期了,也不会触发回调。这显然不行。

3. 前期准备

需要准备的主要就是docker环境,这个可以自行搜一下怎么安装docker环境,由于和本文主要讲的内容关系不大,略…

具体步骤

为了解决此问题,我们可以用延时队列插件来实现,这个插件时一个开发者写的,在github上但是已经被rabbitmq官方接受了,所以可以放心用。

1. 拉取镜像

首先我们先拉取一个rabbitmq的官方镜像进行操作,这个需要注意一下拉取的版本,由于延时队列的插件支持的版本是3.7之后的rabbitmq,所以需要拉取3.7之后的,我这拉取的是3.8.17版本。在命令行输入:

docker pull rabbitmq:3.8.17-management

注:这个如果报错,看下自己的docker环境有没有问题。带management是带管理页面的镜像,我们选用的带management的镜像,后期使用的时候好操作和定位问题。

2. 运行镜像

拉取成功之后,使用命令:

docker images

查看镜像是否拉取成功,如下就是成功了:
rabbitmq镜像
之前用的3.6.8的,不支持延时消息队列的插件…
然后运行镜像,创建容器并启动:

docker run --name rabbitmq-server -p 5672:5672 -p 15672:15672 -d rabbitmq:3.8.17-management

此时用:

docker ps -a

查看容器:
rabbitmq容器
容器已经创建并启动,我们通过web页面可以访问rabbitmq的管理页面,在浏览器输入:http://localhost:15672/
默认账号:guest,密码:guest
rabbitmq登录页面

3. 安装插件

此时rabbitmq已经运行,我们需要安装插件来支持延时消息队列,插件下载地址
选择相应的rabbitmq版本进行下载,注意版本不要选错了。下载完是一个rabbitmq_delayed_message_exchange-3.8.0.ez的文件,我们需要把这个文件上传到docker的/opt/rabbitmq/plugins目录下。
上传之后,进入/opt/rabbitmq/sbin目录执行如下命令让插件生效:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

执行之后看到如下就成功了:
插件启动成功
成功之后刷新一下管理页面,在新建交换机那里,type能多一个x-delayed-message的选项:
添加延时交换机
此时,我们的rabbitmq就配置完成了。

4. 代码支持

rabbitmq目前已经可以接收延时消息了,在代码端我们也需要进行相应的修改,以达到发送延时消息的目的。

4.1 config文件

package com.thcb.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * RabbitMQ的配置类
 *
 * @author thcb
 * @date 2023-09-05
 */
@Configuration
public class RabbitMqConfig {

    // 交换机
    private static final String DELAYED_EXCHANGE = "delayed.exchange";

    // 队列
    private static final String DELAYED_QUEUE = "delayed.queue";

    // 路由
    private static final String DELAYED_ROUTING_KEY = "delayed.routingKey";

    /**
     * 队列
     */
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE);
    }

    /**
     * 交换机
     */
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, args);
    }

    /**
     * 绑定延迟队列和交换机
     */
    @Bean
    public Binding delayQueueBindingDelayExchange() {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();
    }

}

4.2 消费监听

package com.thcb.rabbitmq.recevier;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 消费监听
 *
 * @author thcb
 * @date 2023-09-05
 */
@Slf4j
@Component
public class DelayQueueReceiver {

    @RabbitListener(queues = "delayed.queue")
    public void receiveDelayedQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到DelayedQueue消息:{}", new Date().toString(), msg);
    }

}

4.2 消息生产

这里创建一个controller来生产消息,里面有两个接口,一个生产消息的延时时间是5秒,另一个是30秒,用来测试延时时间。

package com.thcb.rabbitmq.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 消息生产controller
 *
 * @author thcb
 * @date 2023-09-05
 */
@RestController
@RequestMapping("/HelloController")
public class HelloController {

    private static final Logger log = LoggerFactory.getLogger(HelloController.class);

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("/sendXDLMessage1")
    @ResponseBody
    public String sendXDLMessage1() {
        int time = 5000;
        String message = "{\"type\":\"sendXDLMessage1\"}";
        log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);
        rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {
            msg.getMessageProperties().setDelay(time);
            return msg;
        });
        return "sendXDLMessage1 success";
    }

    @RequestMapping("/sendXDLMessage2")
    @ResponseBody
    public String sendXDLMessage2() {
        int time = 30000;
        String message = "{\"type\":\"sendXDLMessage2\"}";
        log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);
        rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {
            msg.getMessageProperties().setDelay(time);
            return msg;
        });
        return "sendXDLMessage2 success";
    }
}

5. 功能测试

代码修改完,就可以测试了,启动工程之后,在rabbitmq管理页面能看到自动创建了如下交换机和队列:
创建的交换机
创建的队列
可以看到交换机的类型是x-delayed-message。
接下来就可以调用测试接口,生产2条消息看看了。先调用sendXDLMessage2接口,生产一个延时30秒的消息,过一会再调用sendXDLMessage1的接口,生产一个延时5秒的消息。log结果如下:
运行结果
结果符合我们的预期,先发的30秒延时消息消息2,之后发的5秒延时消息1,然后过了5秒消息1先回调,之后30秒消息2回调。

镜像操作

使用docker主要就是要制作镜像,之后直接就可以用了要不每次还得配置。提示制作之前,把现在的队列和交换机都删除,队列和交换机是通过代码创建的,账号可以换一个,默认的guest不太安全。
一切都准备就绪,就可以制作镜像了。

1. 镜像制作

将镜像打包成tar文件。

docker commit 【镜像id】 rabbitmq:3.8.17
docker save -o rabbitmq-3.8.17.tar rabbitmq:3.8.17

2. 镜像导入

制作完镜像进行导入

docker load <rabbitmq-3.8.17.tar
docker run -d -p 5672:5672 -p 15672:15672 --privileged --restart=always --name rabbitmq rabbitmq:3.8.17

总结

以上就是rabbitmq延时消息的相关内容,另外这个延时消息在消息很多的情况下可能会有一些性能问题,使用的时候需要注意一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/975902.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySql学习笔记08——事务介绍

事务 基本概念 事务是一个完整的业务逻辑&#xff0c;是一个最小的工作单元&#xff0c;不可再分。 一个完整的业务逻辑包括一系列的操作&#xff0c;这些操作是整个业务逻辑中的最小单元&#xff0c;这些操作要么同时成功&#xff0c;要么同时失败。 由于只有DML语句中才会…

C++那些事之Step by step上手grpc

C那些事之grpc小Demo github上比较火的rpc有grpc、brpc&#xff0c;腾讯内部比较牛逼的trpc等等&#xff0c;这些rpc支持不同的语言、不同平台。今天来聊聊如何使用grpc&#xff0c;从一个简单的demo入手&#xff0c;整个项目使用CMake构建&#xff0c;一个非常标准的rpc项目管…

公园气象站——观测实时气象,保障游客安全

公园气象站是一种用于监测和记录气象数据的系统。在公园内设置公园气象站可以帮助我们了解公园内的气候状况&#xff0c;包括空气湿度、空气温度、风速和风向等参数。这些数据是公园管理、游客安全和环境保护等方面重要的辅助依据。 负氧离子监测&#xff1a;负氧离子是指空气…

Serverless Framework 亚马逊云(AWS)中国地区部署指南

Serverless Framework 亚马逊云(AWS)中国地区部署指南 Serverless Framework 亚马逊云(AWS)中国地区部署指南 前言前置准备 1. 账号的注册2. 全局安装 serverless3. 设置你的系统环境变量4. 设置部署凭证 快速部署一个 hello world 创建入口函数 index.js event 参数context 参…

学习Bootstrap 5的第四天

目录 表格 基础表格 实例 条纹表格 实例 带边框表格 实例 有悬停效果的行 实例 黑色/深色表格 实例 黑色/深色条纹表格 实例 可悬停的黑色/深色表格 实例 无边框表格 实例 上下文类 可用的上下文类&#xff1a; 实例 表头颜色 实例 小型表格 实例 响应…

SpringBoot实现Excel导入导出

话不多说&#xff0c;直接上代码 依赖文档 找到pom文件&#xff0c;如下图所示 引入需要的依赖 <!-- hutool--><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.20</version>&…

如何利用顶级AI简历工具优化求职过程

追求梦想工作既是挑战又是机会。而在这一竞争激烈的职场中&#xff0c;拥有一份完美的简历成为与雇主初次互动的黄金名片。但问题是&#xff0c;如何才能使简历真正脱颖而出&#xff1f; 为何简历如此关键? 很多时候&#xff0c;简历的影响力被忽视&#xff0c;尽管它实际上…

2023年下半年广州/深圳软考(中/高级)认证报名,当然弘博创新

软考是全国计算机技术与软件专业技术资格&#xff08;水平&#xff09;考试&#xff08;简称软考&#xff09;项目&#xff0c;是由国家人力资源和社会保障部、工业和信息化部共同组织的国家级考试&#xff0c;既属于国家职业资格考试&#xff0c;又是职称资格考试。 系统集成…

新晋开源项目 DisJob 加入 Dromara 社区,分布式任务调度框架

作者简介 网名Ponfee&#xff0c;Dromara开源组织成员&#xff0c;dromara/disjob项目作者。在国内多个一线大厂待过&#xff0c;有过后端、全栈、大数据等相关工作经历。 关于Disjob Disjob是天然为支持分布式长任务执行而设计的&#xff0c;它除了具备常规的任务调度功能外…

K210-关于K210基本操作

1.点亮RGB灯 from modules import ybrgb RGB ybrgb() #设置RGB颜色&#xff1a;RGB.set(r, g, b) #参数r控制红灯开关&#xff0c; #参数g控制绿灯开关&#xff0c; #参数b控制蓝灯开关&#xff0c; #输入0表示关闭&#xff0c;输入1表示开启。 RGB.set(1, 0, 0)2.按键功能 …

【Arduino实验笔记】机械臂篇(二) 开关控制LED灯

文章目录 前言硬件介绍实物接线图软件实现库函数介绍程序代码 下一篇的目标总结 前言 本章节介绍如何通过按键控制LED灯&#xff0c;在上一章节中&#xff0c;我们学习了如何控制IO输出电平。而本章节&#xff0c;我们将学会如何读取IO输入的电平。 硬件介绍 观察独立按键&am…

Taurus: 面向机器学习的数据面架构

日益复杂的网络和多样化的工作负载要求网络内置更多的自动化决策能力&#xff0c;通过可编程网络设备在用户面支持机器学习工作负载就是一个可能的选项&#xff0c;本文提出了一种支持用户面推理的架构设计&#xff0c;相对控制面机器学习的性能有数量级的提升。原文: Taurus: …

联想集团财报不及华尔街预期,财务业绩恐将继续恶化

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 华尔街对联想集团财报的预测 在联想集团&#xff08;00992&#xff09;公布2024财年第一季度财务业绩之前&#xff0c;华尔街分析师就曾预测&#xff0c;联想集团的收入和利润将实现强劲增长。 具体而言&#xff0c;根据S&…

VB6.0 设置窗体的默认焦点位置在 TextBox 中

文章目录 VB6.0 窗体的加载过程确定指针的焦点位置添加代码效果如下未设置指定焦点已设置焦点 VB6.0 窗体的加载过程 在VB6.0中&#xff0c;窗体&#xff08;Form&#xff09;加载时会触发多个事件&#xff0c;这些事件按照特定的顺序执行。下面是窗体加载过程中常见事件的执行…

高忆管理:光刻胶概念强势拉升,同益股份、格林达涨停

光刻胶概念5日盘中强势拉升&#xff0c;截至发稿&#xff0c;同益股份、格林达涨停&#xff0c;波长光电、晶瑞电材涨超7%&#xff0c;容大感光涨逾5%&#xff0c;华懋科技、茂莱光学、苏大维格、南大光电等均走强。 音讯面上&#xff0c;据新加坡《联合早报》网站9月2日报导&…

Android jni引用第三方so动态库和.a静态库并且调用(c)方法

最近花了一周时间来入门学习 Android JNI方面的知识&#xff0c;因为后续的工作很多需要用到c c库&#xff0c;我需要用jni来包装一下c函数&#xff0c;来提供给上次java调用。总之多学点知识对自己有好处。 案例效果&#xff1a; 上文我们讲解了 android studio cmake生成.a…

Python可复用函数的 6 种最佳实践

对于在一个有各种角色的团队中工作的数据科学家来说&#xff0c;编写干净的代码是一项必备的技能&#xff0c;因为&#xff1a; 清晰的代码增强了可读性&#xff0c;使团队成员更容易理解和贡献于代码库。 清晰的代码提高了可维护性&#xff0c;简化了调试、修改和扩展现有代码…

【机组】计算机系统组成课程笔记 第一章 计算机系统概论

1.1 计算机的基本概念 电子计算机是一种不需要人工直接干预&#xff0c;能够自动、高速、准确地对各种信息进行处理和存储的电子设备。 1.1.1 存储程序的工作方式 1. 单一的处理部件 2. 存储单元是定长的线性组织 3. 存储空间的单元直接寻址 4. 使用低级机器语言&#xf…

基于Hugo 搭建个人博客网站

目录 1.环境搭建 2.生成博客 3.设置主题 4.将博客部署到github上 1.环境搭建 1&#xff09;安装Homebrew brew是一个在 macOS 操作系统上用于管理软件包的包管理器。类似于centos下的yum或者ubuntu下的apt&#xff0c;它允许用户通过命令行安装、更新和管理各种软件工具、…

JavaScript 执行上下文和作用域链

1 执行上下文 执行上下文决定了变量和函数可以访问哪些数据。 一个执行上下文就对应一个仅后台可访问的变量对象&#xff0c;其中保存有该上下文的局部变量、参数和函数声明。 最外层的上下文称为全局上下文。宿主环境不同&#xff0c;全局上下文的关联对象就不同。在浏览器…