spring cloud、gradle、父子项目、微服务框架搭建---rabbitMQ延时队列(七)

news2025/1/17 2:47:41

总目录

https://preparedata.blog.csdn.net/article/details/120062997


文章目录

    • 总目录
    • 一、rabbit延时插件下载
    • 二、rabbit插件安装
    • 三、项目中配置延时队列
    • 四、定义消息通道
    • 五、生成消息
    • 六、监听消息,进行消费

延时队列的配置是对上片文章的延伸扩展
https://preparedata.blog.csdn.net/article/details/128647139


一、rabbit延时插件下载

查看 RabbitMQ 服务对应的版本号, 示例是3.8.9

访问github,找到对应的插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

下载扩展名为ez的插件包即可

在这里插入图片描述


二、rabbit插件安装

以windows安装为例,将插件包直接放到安装目录plugins文件夹下
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.9\plugins

打开dos窗口,直接启动插件

C:\Users\Administrator>rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启动命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后再重启rabbit服务

可以打开页面的rabbit管理端,Type类型已经多了一个x-delayed-message,说明插件安装成功

在这里插入图片描述


三、项目中配置延时队列

延时队列,新增延时类型声明,其他配置和即时消费的队列一样,即时消费的队列不需要此配置

cloud:
    stream:
        rabbit:
            bindings:
                # 订单-生产者
                orderDelayChannelOutput:
                producer:
                    delayed-exchange: true
                # 订单-消费者
                orderDelayChannelInput:
                consumer:
                    delayed-exchange: true

下面是完整配置

cloud:
    stream:
      # 绑定消息中间件的
      binders:
        # rabbit0000 别名, 同时可以绑定多个不同类型的消息中间件  rabbit0001、rabbit0002、kafka0001
        rabbit0000:
          # 声明类型,是rabbit
          type: rabbit
          environment:
            spring:
              rabbitmq:
                # guest连接登录时,需要使用 localhost
                host: localhost
                # 管理端页面端口是:15672, 配置服务时:5672
                port: 5672
                username: guest
                password: guest
                # 虚拟主机命名空间,"/", 默认虚拟主机,  可以自定义(例如dev、test、prod),区分环境
                virtual-host: /
      # 绑定消息通道,生产通道、消费通道
      bindings:
        # 订单-生产者-延时队列  自定义通道名称
        orderDelayChannelOutput:
          # 生产者和消费者 连接的桥梁 Queues的名称
          destination: shopping.order.create.delay
          # 配置组
          group: shopping-order
        # 订单-消费者-延时队列  自定义通道名称
        orderDelayChannelInput:
          destination: shopping.order.create.delay
          group: shopping-order
      rabbit:
        bindings:
          # 订单-生产者
          orderDelayChannelOutput:
            producer:
              delayed-exchange: true
          # 订单-消费者
          orderDelayChannelInput:
            consumer:
              delayed-exchange: true

四、定义消息通道

和即时消费的队列,是相同的

package com.pd.shopping.order.mq;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface Channels {
    /**
     * 订单-消息生产者
     */
    String ORDER_DELAY_OUTPUT = "orderDelayChannelOutput";
    @Output(ORDER_DELAY_OUTPUT)
    MessageChannel orderDelayChannelOutput();

    /**
     * 订单-消息消费者
     */
    String ORDER_DELAY_INPUT = "orderDelayChannelInput";
    @Input(ORDER_DELAY_INPUT)
    SubscribableChannel orderDelayChannelInput();
}

五、生成消息

package com.pd.shopping.order.controller;

import com.pd.shopping.order.model.bo.OrderMsgBo;
import com.pd.shopping.order.mq.Channels;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.messaging

@RestController
@RequestMapping("/hello")
public class HelloController {
    @Autowired
    private Channels channels;

    @GetMapping("/testDelayedMq")
    public void testDelayedMq() {
        OrderMsgBo bo = new OrderMsgBo();
        bo.setId(1L);
        bo.setCode("aaa");

        //基础延时时间单位是毫秒
        Integer delayTime = 5 * 1000;

        Message<OrderMsgBo> message = MessageBuilder
                                    .withPayload(bo)
                                    .setHeader("x-delay", delayTime)
                                    .build();
        channels.orderDelayChannelOutput().send(message);

    }
}

延时队列需要在消息头中添加"x-delay", 并且给定一个延时时间,单位毫秒

上面代码逻辑是,发送一条消息成功后,不会立即消费,消息通道的消息停留5秒,然后再去消费


六、监听消息,进行消费

和即时消费的队列,是相同的

package com.pd.shopping.order.mq;

import com.pd.shopping.order.model.bo.OrderMsgBo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@EnableBinding(Channels.class)
@Component
public class Listener {

    @StreamListener(Channels.ORDER_DELAY_INPUT)
    public void orderDelayInputListener(Message<OrderMsgBo> message) {
        OrderMsgBo orderMsgBo = message.getPayload();
        //todo 业务处理
    }
}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/158791.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

paddledetection推理代码结构

https://github.com/PaddlePaddle/PaddleDetection/blob/release%2F2.5/deploy/pipeline/README.mdhttps://github.com/PaddlePaddle/PaddleDetection/blob/release%2F2.5/deploy/pipeline/README.md GitHub - leeguandong/Xiaobao: videoclip&#xff0c;视频剪辑应用videocl…

Go 1.19.3 error原理简析

Go error是一个很痛的话题(真心难用) 标准库 error 的定义 // The error built-in interface type is the conventional interface for // representing an error condition, with the nil value representing no error. type error interface {Error() string }error 是一个…

windows10安装wireshark

win10安装wireshark并使用windows10安装wireshark下载WIRESHARK下载Win10Pcapwindows10安装wireshark 你好&#xff01; 这是你第一次使用 Markdown编辑器 所展示的欢迎页。如果你想学习如何使用Markdown编辑器, 可以仔细阅读这篇文章&#xff0c;了解一下Markdown的基本语法知…

javaEE 初阶 — JUC(java.util.concurrent) 的常见类

文章目录1. Callable 接口1.1 Callable 的用法2. ReentrantLock2.1 ReentrantLock 的缺陷2.1 ReentrantLock 的优势3. 原子类4. 信号量 Semaphore5. CountDownLatch6. 相关面试题1. Callable 接口 类似于 Runnable 一样。 Runnable 用来描述一个任务&#xff0c;描述的任务没有…

【Spring源码】21. 关于循环依赖的N个问题

完成了applyMergedBeanDefinitionPostProcessors()方法&#xff0c;后面有一段关于判断Bean是否需要提前曝光的逻辑&#xff08;如下图红框框中部分&#xff09;在这段逻辑中涉及到了著名的循环依赖&#xff0c;提到循环依赖基本必讲三级缓存&#xff0c;好吧&#xff0c;这篇就…

CANOpen中SDO和PDO的COB-ID理解

CAN 总线是一种串行通信协议&#xff0c;具有较高的通信速率的和较强的抗干扰能力&#xff0c;可以作为现场总线应用于电磁噪声较大的场合。由于 CAN 总线本身只定义ISO/OSI 模型中的第一层&#xff08;物理层&#xff09;和第二层&#xff08;数据链路层&#xff09;&#xf…

(8)go-micro微服务Mysql配置

文章目录一 gorm介绍二 gorm安装1.1 下载依赖1.2 使用MySQL驱动三 CURD操作1. 查询1.1 单行查询1.2 多行查询2. 插入数据3. 更新数据4. 删除数据四 初始化连接五 使用六 最后一 gorm介绍 Go语言中的database/sql包提供了保证SQL或类SQL数据库的泛用接口&#xff0c;并不提供具…

redis: jedis连接超时(需要手动注入连接超时检测的配置)

相关版本说明 服务端&#xff1a; redis_version: 6.2.8 客户端&#xff1a; springBoot: 2.7.7 jedis: 3.8.0 问题 偶发redis连接超时&#xff0c;刷新就又好了&#xff0c;服务日志错误信息如下&#xff1a; JedisConnectionException: Unexpected end of stream.原因 …

Linux利用httpd搭建局域网yum源

本例环境&#xff1a;vmwareworkstation16 proCentOS7.9 mast节点&#xff1a;192.168.195.110 用于配置httpd并发布本地yum源 node节点&#xff1a;192.168.195.111 用于验证mast节点的yum源是否可用 思路&#xff1a;1.在mast节点挂载/上传镜像后配置本地yum源 2.利用本…

JSP三种脚本

脚本可以编写Java语句、变量、方法或表达式。 1.普通脚本 语法: <% Java代码%> <% page contentType"text/html;charsetUTF-8" language"java" %><html><head> <title>Title</title></head><body>&l…

对u盘的分区进行删除和格式化

一、说明 当usb盘&#xff0c;或者SD卡用作启动盘后&#xff0c;将出现多个盘符、多个分区&#xff1b;若将此盘重新当文件盘&#xff0c;需要删除以前的分区&#xff0c;并重新格式化后&#xff0c;才能使用。 二、使用Diskpart在Windows 10中对USB进行分区删除 2.1 尝试磁盘…

重启之后,台式机网络不能连接怎么办

目录 1.问题 2.排查过程 3.心得 1.问题 前天电脑意外断电后,再启动发现网络变成了未连接状态.查看本地连接显示已启动,但IPv4和IPv6未连接.当时做了一些尝试,没有收到效果,直到今天问题才得以解决. 2.排查过程 Windows网络诊断为:DNS服务器未响应.后来花了一部分时间在DNS…

ruoyi-vue集成magic-api(一)

ruoyi虽然带了强大的代码生成器&#xff0c;面对比较通用的CRUD还是游刃有余的&#xff0c;但在项目开发阶段&#xff0c;需求总是经常变化的&#xff0c;数据结构和逻辑也经常变化&#xff0c;我们需要的是快速验证功能逻辑&#xff0c;代码生成器可帮不上忙&#xff0c;每次需…

一、java编写登录功能

java编写登录功能 文章目录java编写登录功能前言编程学习记录一、登录逻辑简述二、代码实现1.创建USER表2.前端代码3.创建User类4.创建LoginServlet类5.创建JDBCUtils类6.创建UserDao类7.创建FailServlet类9.创建SuccessServlet 类11.配置tomcat 服务12.启动服务前言 编程学习…

SpringCloud Netfllix复习之Hystrix

文章目录写作背景Hystrix是什么Hystrix的核心功能上手实战RestTemplate整合HystrixOpenFeign整合HystrixOpenFeign与Hystrix整合的各种参数如何配置&#xff1f;源码验证基于HystrixCommand注解实现熔断源码分析初始化资源线程池的源码OpenFeign与Hystrix整合执行请求的源码写作…

Java多线程:创建多线程的“四种“ 方式

Java多线程&#xff1a;创建多线程的"四种" 方式 每博一文案 白马笑西风写道&#xff1a;江南有杨柳&#xff0c;有燕子&#xff0c;金鱼......汉人中有的是英俊勇武的少年&#xff0c;倜傥潇洒的少年...... 但这个美丽的姑娘就像故高昌国人那样固执&#xff1a;&qu…

buctoj-2023寒假集训-进阶训练赛(八)

问题 A: 分离出整数n从右边数第k个数字&#xff0c;递归实现 题目描述 在程序中定义一函数digit(n,k)&#xff0c;它能分离出整数n从右边数第k个数字。 输入 正整数n和k。 输出 第k个数字(若不存在则输出0&#xff09; 样例输入 31859 3 样例输出 8 #include<bits/stdc.h&g…

电商直播小程序核心功能有哪些?电商直播小程序代码分析

一个优质的电商直播小程序&#xff0c;必须带有后台管理&#xff0c;模块功能分工明确&#xff0c;可以让商家及时管理商品。在管理后台端又分为会员、商品、订单、店铺、直播、分销、优惠券、物流、数据等功能列表栏&#xff0c;基本功能较完善。下文小编将为大家讲解一下电商…

Linux命令行中 git 的使用

文章目录&#xff1a;什么是gitgitee新建仓库git提交代码1.同步远程仓库代码 - git pull2.查看本地仓库的状态 - git status3.添加代码到本地.git缓冲区 - git add4.推送代码到本地仓库.git中 - git commit5.同步本地仓库.git的内容到远程仓库 - git push什么是git Git 是一个…

2023年了,浏览器竟然还有新玩法,能看热搜能领券

在移动互联网时代&#xff0c;手机浏览器是手机中不可缺少的APP之一。我们经常使用手机浏览器查资料&#xff0c;看新闻&#xff0c;看小说等等。如今&#xff0c;手机浏览器的功能越来越强大&#xff0c;玩法也越来越多。最近&#xff0c;发现一款手机浏览器&#xff0c;竟然聚…