RabbitMq应用延时消息

news2025/1/16 8:21:09

一.建立绑定关系

package com.lx.mq.bind;

import com.lx.constant.MonitorEventConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author liuweiping.com
 * @version 1.0
 * @date 2023-06-26 10:04:03
 */
@Slf4j
@Configuration
public class MonitorRabbitMqBinding {

    @Value(value = "-${spring.profiles.active}")
    private String profile;

    /**
     * Description: 延迟消息 <br/>
     * Created By: liu wei ping <br/>
     * Creation Time: 2023年6月26日 下午6:59:43 <br/>
     * <br/>
     * @return <br/>
     */
    @Bean("delayExchange")
    public CustomExchange buildDelayedMessageNoticeExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(MonitorEventConst.MONITOR_DELAYED_MESSAGE_EXCHANGE + profile, "x-delayed-message", Boolean.FALSE, Boolean.FALSE, args);
    }

    @Bean
    public Queue buildDelayedMessageNoticeQueue(){
        return QueueBuilder.durable(MonitorEventConst.MONITOR_DELAYED_MESSAGE_QUEUE + profile).build();
    }

    @Bean
    public Binding buildDelayedMessageNoticeBinding(){
        return BindingBuilder.bind(buildDelayedMessageNoticeQueue()).to(buildDelayedMessageNoticeExchange()).with(MonitorEventConst.MONITOR_DELAYED_MESSAGE_ROUTING_KEY).noargs();
    }

    /**
     * 交车完成事件消息定时处理队列
     */
    @Bean
    public Queue deliveryCompleteEventHandQueue() {
        return QueueBuilder.durable(MonitorEventConst.DELIVERY_COMPLETE_DELAYED_QUEUE + profile).build();
    }

    /**
     * 交车完成事件消息定时处理队列绑定
     */
    @Bean
    public Binding deliveryCompleteBinding() {
        return BindingBuilder.bind(deliveryCompleteEventHandQueue())
                .to(buildDelayedMessageNoticeExchange())
                .with(MonitorEventConst.DELIVERY_COMPLETE_DELAYED_ROUTING_KEY)
                .noargs();
    }

}

二.建立生产者

1.消息实体

package com.lx.dto.monitor;

import lombok.Data;

import java.util.Date;

/**
 * @author liuweiping.com
 * @version 1.0
 * @date 2023-06-26 10:11:06
 */
@Data
public class MonitorEventMessage {

    /**
     * 事件id
     */
    private String eventId;

    /**
     * 事件编码
     */
    private String eventCode;

    /**
     * 业务数据
     */
    private String businessUniqueKey;

    /**
     * 业务类型
     */
    private String businessType;

    /**
     * 到期时间
     */
    private Long expireMillis;

    /**
     *  时间处理唯一版本号
     */
    private Integer eventHandVersion;

    /**
     * 定时处理时间
     */
    private Date timedOperationTime;

    public void setTimedOperationTime(Date timedOperationTime) {
        this.timedOperationTime = timedOperationTime;
        expireMillis = timedOperationTime.getTime() - new Date().getTime();
        if (expireMillis < 0) {
            expireMillis = 0L;
        }
    }
}
package com.lx.mq.producer;

import com.lx.constant.MonitorEventConst;
import com.lx.designPattern.strategypattern.workorderbase.service.sysField.JsonUtil;
import com.lx.dto.monitor.MonitorEventMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 监控事件消息发送类
 */
@Slf4j
@Component
public class MonitorEventMessageProducer {

    @Value(value = "-${spring.profiles.active}")
    private String profile;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *  交车完成监控事件定时发送
     */
    public void sendDeliveryCompleteEventHandMessage(MonitorEventMessage monitorEventMessage) {
        String message = JsonUtil.toJson(monitorEventMessage);;
        rabbitTemplate.convertAndSend(MonitorEventConst.MONITOR_DELAYED_MESSAGE_EXCHANGE + profile,
                MonitorEventConst.DELIVERY_COMPLETE_DELAYED_ROUTING_KEY,
                message,
                msg -> {
                    msg.getMessageProperties().setDelay(monitorEventMessage.getExpireMillis().intValue());
                    return msg;
                });
        log.info("sending event processing messages: {}", message);//发送事件处理消息
    }

  
}

三.建立消费者

package com.lx.mq.consumer;

import com.lx.constant.MonitorEventConst;
import com.lx.designPattern.strategypattern.workorderbase.service.sysField.JsonUtil;
import com.lx.dto.monitor.MonitorEventMessage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * 监控事件消息发送类
 */
@Slf4j
@Component
public class MonitorEventMessageConsumer {

    @Value(value = "-${spring.profiles.active}")
    private String profile;

    /**
     *  交车完成事件处理mq监听
     */
    @RabbitListener(queues = MonitorEventConst.DELIVERY_COMPLETE_DELAYED_QUEUE + "-${spring.profiles.active}")
    public void dealWithDeliveryCompleteEventHandMessage(String eventMessage, Channel channel, Message message) {
        log.info("dealWithDeliveryCompleteEventHandMessage:【{}】", JsonUtil.toJson(eventMessage));
        String str = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("Received the message of regular loading and unloading of goods: {}", str); //收到商品定时上下架消息
        MonitorEventMessage monitorEventMessage = JsonUtil.toBean(eventMessage, MonitorEventMessage.class);
        try {
            analyzeHand(monitorEventMessage);
        }catch (Exception e){
            log.error("交车完成事件分析失败,参数:{},e:{}",JsonUtil.toJson(monitorEventMessage),JsonUtil.toJson(e));
        }
    }

    /**
     *  事件分析
     * @param monitorEventMessage
     */
    private void  analyzeHand(MonitorEventMessage monitorEventMessage) throws Exception {

    }
}

四.测试类测试

package com.lx.controller;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import com.lx.conf.MQConfig;
import com.lx.conmon.ResultData;
import com.lx.dto.monitor.MonitorEventMessage;
import com.lx.mq.producer.MonitorEventMessageProducer;
import com.lx.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.lx.constant.RedisMange;
import com.lx.utils.RedisUtil;
import org.thymeleaf.util.DateUtils;


/**
 * @Author : liu wei ping
 * @CreateTime : 2019/9/3
 * @Description :
 **/

@RestController
public class SendMessageController {


    @Autowired
    private MonitorEventMessageProducer messageProducer;

 
    @GetMapping("/sendTopicMessage3")
    public ResultData<String> sendTopicMessage3() {
        MonitorEventMessage monitorEventMessage = new MonitorEventMessage();
        monitorEventMessage.setEventCode("delivery");
        //设置定时处理时间= 当前时间+ 定时处理时长
        monitorEventMessage.setTimedOperationTime(DateUtil.date(DateUtil.getCurrentMillis() + 30 * 1000));
        monitorEventMessage.setBusinessType("deliveryType");
        messageProducer.sendDeliveryCompleteEventHandMessage(monitorEventMessage);
        return new ResultData<>("ok");
    }
}

五.效果如图所示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/711839.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

innovus: 如何让两个generate clock balance skew group?

如题&#xff0c;在sdc约束中两个generate clock被设置到同一个clock group中&#xff0c;工具虽然会生成对应的两个skew group&#xff0c;但是阅读ccoot_spec发现工具并没有让两个skew group进行balance&#xff0c;这时就需要手动设置balance skew group。 方法如下: 在读…

回归预测 | MATLAB实现WOA-DBN鲸鱼算法优化深度置信网络的多输入回归预测

回归预测 | MATLAB实现WOA-DBN鲸鱼算法优化深度置信网络的多输入回归预测 目录 回归预测 | MATLAB实现WOA-DBN鲸鱼算法优化深度置信网络的多输入回归预测效果一览基本介绍模型描述程序设计参考资料 效果一览 基本介绍 基于鲸鱼算法优化深度置信网络(WOA-DBN)的数据回归预测&…

微软AI太会了,示爱威胁PUA!

微软在以ChatGPT为基础的最新搜索引擎New Bing在公测仅一周后就引发了人们的担忧和恐惧。用户反馈&#xff0c;New Bing不仅会表现出类似示爱、PUA和威胁人类等人类特有的行为&#xff0c;还可能超越人类意志和价值观&#xff0c;并违反“阿西莫夫的机器人三定律”。这引起了人…

ODERBY的运行原理

定义表: CREATE TABLE t (id int(11) NOT NULL,city varchar(16) NOT NULL,name varchar(16) NOT NULL,age int(11) NOT NULL,addr varchar(128) DEFAULT NULL,PRIMARY KEY (id),KEY city (city) ) ENGINEInnoDB; SQL语句: select city,name,age from t where city杭州 orde…

【第二章 flutter学习之Dart介绍】

文章目录 前言一、Dart环境搭建安装 Dart Sdkvscode安装dart 前言 Dart是谷歌开发的计算机编程语言&#xff0c;诞生于2011&#xff0c;可以被用于web、服务器、移动应用、物联网应用的开发。要学习flutter必须会Dart 一、Dart环境搭建 安装 Dart Sdk 官网&#xff1a;https…

Kubernetes(k8s)容器编排Service

目录 1 Service概述1.1 为什么要有Service1.2 Service实现原理 2 Service 的类型3 Service示例3.1 准备工作3.1.1 创建deployment3.1.2 启动deployment3.1.3 访问测试 3.2 ClusterlP类型3.2.1 编辑资源清单3.2.2 应用Service3.2.3 访问测试3.2.4 删除Pod3.2.5 访问测试 3.3 Nod…

Python学习——类与对象

一、编程的两大思想 &#xff08;1&#xff09;面向过程 事物比较简单&#xff0c;用简单的线性思维即可解决 &#xff08;2&#xff09;面向对象 事物比较复杂&#xff0c;用简单的线性思维无法解决 &#xff08;3&#xff09;两者之间的关系 在面对复杂的问题时&#xff0c;宏…

gcc编译过程详解

以一个简单的C代码为例&#xff0c;详细讲解gcc整个编译过程。 1、预处理 主要处理#开头的东西&#xff0c;例如头文件处理、条件编译处理、将宏定义进行替换&#xff0c;还可以去掉注释、添加行号等。预处理的命令如下&#xff1a; gcc -E hello.c -o hello.i #-E表示预处理…

全志V3S嵌入式驱动开发(解决kernel 5.2.y wifi驱动问题)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 wifi模块&#xff0c;之前测试的时候&#xff0c;开发板上用的是esp 8089&#xff0c;当时内核时4.14.y&#xff0c;测试结果也是通过的。印象不是…

《C++ Primer》--学习10

反向迭代器 反向迭代器就是在容器中从尾元素向首元素反向移动的迭代器&#xff0c;递增一个反向迭代器会移动到前一个元素 反向迭代器需要递减运算符 我们只能从既支持也支持--的迭代器来定义反向迭代器&#xff0c;除了 forward_list 外的标准容器都支持 流迭代器不支持递减…

【分布式存储】聊聊共识和一致性

在分布式存储系统中&#xff0c;对于提高性能、可用性、可拓展性来说都有相关机制可以保证&#xff0c;比如复制、切片等&#xff0c;但是一旦涉及到分布式系统中选主的问题&#xff0c;就比较难&#xff0c;因为网络是不可靠的&#xff0c;并且可能还有拜占庭将军问题。所以如…

JAVA8-lambda表达式7:重要的函数接口

从什么是好代码讲起 最近又在看《clean code》&#xff0c;回顾了一下里面提到的整洁代码的标准。 然后审视了一下现在的项目代码&#xff0c;里面还有很多if&#xff0c;for循环。比如&#xff1a; // 查询用户列表 List<User> userList userService.list(); // 打印…

VSC++: 验证身份证

缘由https://ask.csdn.net/questions/1082358 void 验证身份证() {//缘由https://ask.csdn.net/questions/1082358int 权重[] { 7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2 }, 个 0, j 0, a 0, he 0;char M[] "10X98765432", 身份号[100][20]{};//…

如何备份 Kubernetes MySQL Operator 集群

Oracle 的MySQL Operator for Kubernetes 是在集群内自动化 MySQL 数据库配置的便捷方法。该运营商的主要功能之一是集成的自动备份支持,可提高您的弹性。备份会定期将数据库复制到外部存储。 本文将引导您完成设置到与 Amazon S3 兼容的对象存储服务的备份。您还将了解如何将…

C/C++结构体内存对齐的一些思考

在C中&#xff0c;结构体的内存对齐是为了提高访问结构体成员变量的效率和保证硬件的要求。 结构体对齐 C/C C 结构体内存对齐的示例代码C/C结构体内存对齐的原则结合汇编代码分析结构体的内存对齐问题 C 结构体内存对齐的示例代码 #include <iostream>struct Test_Stru…

运维必学 | 初识介绍-从零开始学Windows批处理(Batch)编程系列教程

欢迎关注「全栈工程师修炼指南」公众号 设为「星标⭐」每天带你 基础入门 到 进阶实践 再到 放弃学习&#xff01; 专注 企业运维实践、网络安全、系统运维、应用开发、物联网实战、全栈文章 等知识分享 “ 花开堪折直须折&#xff0c;莫待无花空折枝。 ” 作者主页&#xff1…

ACL 2023长文 | 基于能量超球体模型提升以事件为中心的结构化预测

论文标题&#xff1a; SPEECH: Structured Prediction with Energy-Based Event-Centric Hyperspheres 收录会议&#xff1a; ACL 2023 Main Conference 论文链接&#xff1a; https://arxiv.org/abs/2305.13617 开源链接&#xff1a; https://github.com/zjunlp/SPEECH 总述 以…

硬件知识:条码打印机5大接口类型介绍

目录 1、串口 2、并行接口 &#xff08;并口&#xff09; 3、USB接口 4、网口 5、PS/2接口 接口选择的不同&#xff0c;其打印输出的速度也不同。 条码打印机与计算机之间都是通过接口连接的&#xff0c;条码打印机常见的分为5种接口&#xff1a;串口&#xff08;也有称之…

分布式学习第五条 Nginx + FastDFS

使用nginx和FastDFS可以实现下载&#xff0c;类似百度网盘&#xff0c;需要对环境进行配置&#xff0c;nginx作为代理服务器&#xff0c;fastDFS负责执行上传下载操作。 1. 文件上传下载流程 文件上传流程 文件下载流程 优化 优化思路: 直接让客户端连接fastDFS的存储节点, …

Flutter之 Bloc实战实现购物车功能

Flutter之 Bloc实现购物车功能 前言商品列表模块状态设置UI设计业务逻辑测试代码购物车模块状态设置业务逻辑UI设计加入购物车测试代码参考资料前言 本篇以官方购物车项目为例,说明Bloc在Flutter的应用。该项目很简单,就两个模块,一个是商品列表页面模块catalog,一个是购物…