rabbitmq消息发送的可靠性:结合mysql来保证消息投递的可靠性

news2024/11/15 9:19:33

  • 消息从生产者到Broker,则会触发confirmCallBack回调
  • 消息从exchange到Queue,投递失败则会调用returnCallBack
  1. 用一张表来记录发送到mq的每一条消息,方便发送失败需要重试。status: 1-正常,0-重试,2-失败。
  2. 发送消息前,保存消息,并设置status为0和设置重试时间TryTime。
  3. 发送消息后:
  • 如果回调方法confirm接收到为true---表示发送成功,拿到msgId,修改数据库中status为1,表示消息投递成功。
  • 如果回调方法confirm接收到为false---失败不做任何操作,此时消息状态status还是0,而0表示重试的状态。

4. 设置一个定时任务:10秒执行一次. 根据(status)状态0并且重试时间(try_time)<        sysdate()当前时间查询出List<RabbitmqSendLog> rabbitmqSendLogs消息的数量rabbitmqSendLogs

5.遍历所有消息rabbitmqSendLogs.forEach

  • 如果重试次数count>=3,则直接设置这条消息发送失败status=2
  •  如果重试次数count<3, 则根据message_id和获取当前时间更新date来更新这条消息重试次数count+1;
  •  获取消息体content,并讲获取到的json字符串解析成对象,然后通过rabbitTemplate把对象投递到rabbitmq

数据表rabbitmq_send_log

 

 对应的实体类

@Data
@TableName("rabbitmq_send_log")
public class RabbitmqSendLog {
//    @TableId(type = IdType.AUTO)
    private String messageId;
    private String content;
    private Integer status;
    private String routeKey;
    private String exchange;
    private Integer count;
    private Date tryTime;
    @TableField(fill = FieldFill.INSERT)
    private Date createTime;
    @TableField(fill = FieldFill.INSERT_UPDATE)
    private Date updateTime;
}
server:
  port: 8071

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
    virtual-host: /ems
    template:
      retry: #重试,消息发送失败会重试
        enabled: true # 开启重试
        initial-interval: 10000ms  #第一次十秒重试
        max-interval: 80000ms  #最后一次是八秒重试
        multiplier: 2  #重试翻倍率
    publisher-confirms: true #发送者开启 confirm 确认机制
    publisher-returns: true  # 发送者开启 return 确认机制

  datasource:
    url: jdbc:mysql://localhost:3306/one?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&AllowPublicKeyRetrieval=True
    username: root
    password: root


mybatis-plus:
  mapper-locations: classpath*:com/test/mapper/xml/*.xml
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    map-underscore-to-camel-case: true
  type-aliases-package: com.test.domain

配置类

package com.test.config;


import com.test.mapper.RabbitmqSendLogMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@Slf4j
@Configuration
public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Resource
    private RabbitmqSendLogMapper rabbitmqSendLogMapper;

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //rabbitTemplate发送消息json转换配置
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        return rabbitTemplate;
    }

    /**
     * 配置接收消息json转换为对象
     *
     * @return
     */
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    // 下边这样写也可以
    // @Autowired
    // private RabbitTemplate rabbitTemplate;
    // @PostConstruct
    // public void init() {
    //     rabbitTemplate.setMandatory(true);
    //     rabbitTemplate.setReturnCallback(this);
    //     rabbitTemplate.setConfirmCallback(this);
    // }
//消息从生产者到Broker,则会触发confirmCallBack回调
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("confirm==>发送到broker.exchange失败\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                    correlationData, ack, cause);
        } else {
            log.info("confirm==>发送到broker.exchange成功\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                    correlationData, ack, cause);
            String msgId = correlationData.getId();
            Integer status = 1;
            //拿到msgId,修改数据库中status为1,表示消息投递成功
            rabbitmqSendLogMapper.updateRabbitmqSendLogStatus(msgId, status);
            //update rabbitmq_send_log set status = #{status} where message_id=#{msg}
            System.out.println("消息投递成功->msgId: " + msgId);

        }
    }

    //消息从exchange到Queue,投递失败则会调用returnCallBack
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
                        "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
                message, replyCode, replyText, exchange, routingKey);
    }
}

定时任务,每隔10秒冲数据库查询,然后再从新投递到mq。

package com.test.task;

import com.alibaba.fastjson.JSON;
import com.test.domain.RabbitmqSendLog;
import com.test.domain.Student;
import com.test.mapper.RabbitmqSendLogMapper;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;
import java.util.List;

/**
 * Created by IntelliJ IDEA.
 *
 * @Author : Yang Kai
 * @create 2022/12/21 15:43
 */
@Component
@EnableScheduling
public class RabbitSendTask {
    @Resource
    private RabbitmqSendLogMapper rabbitmqSendLogMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //每隔十秒执行一次
    @Scheduled(cron = "0/10 * * * * ?")
    public void messageSendTask() {
        //select * from rabbitmq_send_log where status=0 and try_time &lt; sysdate()
        List<RabbitmqSendLog> rabbitmqSendLogs = rabbitmqSendLogMapper.getMsgStatus();
        rabbitmqSendLogs.forEach(s->{
            //3表示:如果重试次数大于或等于3
            if (s.getCount()>=3){
                //update rabbitmq_send_log set status = #{status} where message_id=#{msgId}
                rabbitmqSendLogMapper.updateRabbitmqSendLogStatus(s.getMessageId(),2);//直接设置这条消息发送失败
            }else {
                System.out.println("重试消息:  "+s);
                //更新重试消息的次数
                //update rabbitmq_send_log set count=count+1,update_time=#{date} where message_id=#{messageId}
                rabbitmqSendLogMapper.updateCount(s.getMessageId(),new Date());
                //拿到重试消息的json消息体,把json转换为消息对象
                Student student = JSON.parseObject(s.getContent(), Student.class);
                //投递消息
                rabbitTemplate.convertAndSend(s.getExchange(),s.getRouteKey(),student,new CorrelationData(s.getMessageId()));
            }
        });
    }
}

 接收rabbitmq消息的类

package com.test.service;

import com.rabbitmq.client.Channel;
import com.test.domain.Student;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class WorkCustomer {


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue.name.user", declare = "true"), // 创建info队列,declare默认队列持久化
                    key = {"route.user"}, // 路由key
                    exchange = @Exchange(type = "direct", name = "exchange-directs-user")
            )})
    public void receive12211(Student student, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        System.out.println("路由模式message1 = " + student);
    }


}

发送消息

  @Test
    public void contextLoadsx() {
        String msgId = UUID.randomUUID().toString();
        System.out.println("1msgId:"+msgId);
        RabbitmqSendLog rabbitmqSendLog = new RabbitmqSendLog();
        rabbitmqSendLog.setMessageId(msgId);
        rabbitmqSendLog.setExchange("exchange-directs-user");
        rabbitmqSendLog.setRouteKey("route.user");
        rabbitmqSendLog.setCount(1);
        rabbitmqSendLog.setStatus(0);
        //1分钟后重试的时间
        rabbitmqSendLog.setTryTime(new Date(System.currentTimeMillis()+1000+60+1));
        Student student = new Student();
        student.setAddress("上海");
        student.setAge("12");
        student.setName("小明");
        String s = JSON.toJSONString(student);
        rabbitmqSendLog.setContent(s);
        int insert = rabbitmqSendLogMapper.insert(rabbitmqSendLog);
        try {
            rabbitTemplate.convertAndSend("exchange-directs-user", "route.user", student,new CorrelationData(msgId));
        }catch (Exception e){
            //如果mq网络原因.发送邮件,活着其他方式通知
//            rabbitmqSendLogMapper.insert(rabbitmqSendLog);
        }
        
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/134702.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【计算机视觉】OpenCV 4高级编程与项目实战(Python版)【1】:图像处理基础

目录 1. OpenCV简介 2. OpenCV开发环境搭建 3. 读取图像 4. 读取png文件出现警告 5. 显示图像 6. 保存图像 7. 获取图像属性 本系列文章会深入讲解OpenCV 4&#xff08;Python版&#xff09;的核心技术&#xff0c;并提供了大量的实战案例。这是本系列文章的第一篇&…

简单了解计算机的工作原理

文章目录一.计算机操作系统二.进程/任务三、进程控制块抽象(PCB)四、进程调度相关属性五、内存管理一.计算机操作系统 概念:操作系统是一组做计算机资源管理的软件的统称. 目前常见的操作系统有&#xff1a;Windows系列、Unix系列、Linux系列、OSX系列、Android系列、iOS系列…

百度安全在线查询,网站弹出风险提示怎么处理

站长们要避免网站打开弹出风险提示&#xff0c;需要要时刻关注自己的网站是否存在风险&#xff0c;时刻知道自己的网站是不是安全的。 百度安全在线查询步骤&#xff1a; 1、打开站长工具 2、添加需要查询的网站域名。 3、勾选百度安全。 4、点击开始查询。 等…

22个Python的万用公式分享

在大家的日常python程序的编写过程中&#xff0c;都会有自己解决某个问题的解决办法&#xff0c;或者是在程序的调试过程中&#xff0c;用来帮助调试的程序公式。小编通过几十万行代码的总结处理&#xff0c;总结出了22个python万用公式&#xff0c;可以帮助大家解决在日常的py…

再学C语言22:循环控制语句——循环嵌套和数组处理

嵌套循环&#xff08;nested loop&#xff09;&#xff1a;在一个循环内使用另一个循环 一、循环嵌套 示例代码&#xff1a; #include <stdio.h> int main(void) {int i;int j;for(i 0; i < 10; i){for(j 0; j < 9; j){printf("%5d", j); // 里面的…

共享模型之管程(二)

1.Moniter对象 1.1.Java对象头 1>.以32位虚拟机为例 ①.普通对象 Klass Word表示对象的类型,它是一个指针,指向了对象所从属的class; ②.数组对象 在32位虚拟机中,integer包装类型的长度为12个字节,而int基本数据类型的长度为4个字节; 其中Mark Word结构为: 2>.64位…

shell第一天练习

题目&#xff1a; 1、在当前主机编写脚本文件history_max.sh显示主机中执行频率最高的前5个命令。 2、判断主机是否存在rhel用户&#xff0c;如果存在则设置密码为redhat,如果不存在则创建用户并设置密码。 3、通过设置变量HISTTIMEFORMAT&#xff0c;使得当执行history命令时…

16. BootStrap

文章目录一、Bootstrap1、概念2、快速入门二、响应式布局三、CSS样式和JS插件1、全局CSS样式2、组件1. 导航条2. 分页条3、插件1. 轮播图四、案例1、案例描述2、案例分析3、实现一、Bootstrap 1、概念 * 概念&#xff1a; 一个前端开发的框架&#xff0c;Bootstrap&#xff0…

Linux网络配置(如何设置静态IP?如何设置查看主机名?)

文章目录Linux网络配置一、网络地址配置1.1. 查看网络地址1.2. 测试两个地址是否连接1.3. Linux系统的网络配置二、主机名以及hosts映射2.1. 查看和设置主机名2.2. hosts映射2.3. DNSLinux网络配置 一、网络地址配置 如果在一台Windows电脑上安装了Linux虚拟机&#xff0c;那…

颤抖开篇,从php角度谈谈IO模型(BIO)

颤抖开篇&#xff0c;从php角度谈谈IO模型&#xff08;BIO&#xff09; IO 是什么? 在计算机系统中I/O就是输入&#xff08;input&#xff09;和输出&#xff08;Output&#xff09;的意思。针对不同的操作对象&#xff0c;可以划分为磁盘I/O模型&#xff0c;网络I/O模型&am…

开发神器VSCode配置C/C++编译环境

hi&#xff0c;小伙伴们大家好&#xff0c;今天给大家介绍一款程序员常用的开发神器VSCode&#xff0c;想必大家肯定有所了解&#xff0c;也有很多小伙伴在日常工作中经常使用。当木荣君初次见到VSCode时&#xff0c;真正的被它惊艳到了&#xff0c;可以说是一见钟情。从此就爱…

13.6-14.8读书笔记

13.6 对象移动 13.6.1 右值引用 概念: 为了支持移动操作,新标准引入了的一种新的引用类型.所谓右值引用就是必须绑定到右值的引用. 通过&&来获得右值引用 int i 42;int &r i;int &&rr i; // 错误,不能将一个右值引用绑定到一个左值上int &r3 …

【python基础_05】面向对象

文章目录1. 类和对象1.1 使用对象组织数据的模版1.2 成员变量和成员方法1.3 实现代码2. 内置方法&#xff08;魔术方法&#xff09;2.1 构造方法&#xff1a;__init__&#xff08;&#xff09;1. 类和对象 1.1 使用对象组织数据的模版 1.2 成员变量和成员方法 1.3 实现代码 1…

jupyter notebook无法启动内核

jupyter notebook无法启动内核问题概述方法一使用Window PowerShell方法二更改文件路径重新启动内核参考问题概述 遇到的问题是在使用jupyter的时候无法正常运行,所以在这里尝试一些办法,在这里进行记录,希望能够帮助到大家 方法一 使用Window PowerShell 首先第一个方法就…

Java IO流 - 释放资源的方式

资源释放的方式 书接上文, 在上一篇文章我们做过一个文件拷贝的练习, 但是在联系中是有释放资源隐患的的, 例如在下面代码中, 在文件释放之前有许多行的逻辑代码; 如果这许多行的逻辑代码有报错, 导致程序不运行, 那么资源就得不到释放 public static void main(String[] args)…

Crack:ActiveReportsJS 3.2.2 EN:ActiveReportsJS

ActiveReportsJS - 高级 JavaScript 报告解决方案 ActiveReportsJS 是一种用于在前端应用程序中可视化数据的报告解决方案。Ω578867473自定义报告布局并将我们的报告设计器和查看器组件集成到 Web 应用程序中&#xff0c;以便在任何平台上预览、导出或打印报告。 使用我们的跨…

WPF+ASP.NET SignalR实现动态折线图

在实际业务中&#xff0c;当后台数据发生变化&#xff0c;客户端能够实时的收到通知&#xff0c;而不是由用户主动的进行页面刷新才能查看&#xff0c;这将是一个非常人性化的设计。有没有那么一种场景&#xff0c;后台数据明明已经发生变化了&#xff0c;前台却因为没有及时刷…

ElementUI——案例2用户管理(基于SpringBoot实现增删改)

1.ElementUI整合SpringBoot前后端分离实现用户增删改查 效果展示 2.前端核心代码 项目目录 main.js引入 import ElementUI from element-ui; import element-ui/lib/theme-chalk/index.css; import router from ./router import axios from axiosVue.prototype.$http ax…

SQL 存储过程

文章目录存储过程简介存储过程的创建及调用存储过程的删除 如何删除存储过程存储过程的优缺点现需要向学生表中插入新的学生数据。但在插入学生数据的时&#xff0c;需要同 时检查老师表里的数据。如果插入学生的老师不在老师表里&#xff0c;则先向老师表中插入一条老师数据&a…

高级IO-多路转接

高级IO 以前的都是拷贝接口。write什么的就是将字符串拷贝到发送缓冲区中。 应用层等待接收缓冲区填写数据的过程算是IO吗&#xff1f;算 IO等待拷贝数据&#xff1b; 真正的IO的过程就是拷贝的过程。比如等待鱼上钩的时候也算是钓鱼(adj)&#xff0c;当把鱼拿上来的时候也…