【RabbitMQ实战】邮件发送(直连交换机、手动ack)

news2024/7/8 1:19:39

一、实现思路

二、异常情况测试现象及解决

在这里插入图片描述

说明:本文涵盖了关于RabbitMQ很多方面的知识点, 如:
消息发送确认机制 、消费确认机制 、消息的重新投递 、消费幂等性,

二、实现思路
1.简略介绍163邮箱授权码的获取
2.编写发送邮件工具类
3.编写RabbitMQ配置文件
4.生产者发起调用
5.消费者发送邮件
6.定时任务定时拉取投递失败的消息, 重新投递
7.各种异常情况的测试验证
8.拓展: 使用动态代理实现消费端幂等性验证和消息确认(ack)

三、 代码实现

配置版本如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.atguigu.gulimall</groupId>
    <artifactId>provider-and-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>provider-and-consumer</name>
    <description>Demo project for Spring Boot</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>1.8</java.version>
        <!--        <spring-cloud.version>2021.0.4</spring-cloud.version>-->
        <spring-cloud.version>2021.0.1</spring-cloud.version>
    </properties>
    <dependencies>
        <!--joda time  ? 这个还有些问题,这个类库是做什么的-->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.10</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>
        <dependency>
            <groupId>com.atguigu.gulimall</groupId>
            <artifactId>gulimall-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
            <exclusions>
                <exclusion>
                    <artifactId>servlet-api</artifactId>
                    <groupId>javax.servlet</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--什么作用? -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.4.2</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/java</directory><!--所在的目录-->
                <includes><!--包括目录下的.properties,.xml文件都会扫描到-->
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>

</project>

完整代码可以参考我的GitHub, https://gitee.com/zhai_jiahao/gulimall

代码实现
1.163邮箱授权码的获取, 如图:
在这里插入图片描述
每次启用授权码的时候,就会出现一行字符串,其实就是三方发送邮件的时候,使用的密码(该授权码就是配置文件spring.mail.password需要的密码)

项目结构
在这里插入图片描述

1、rabbitmq、邮箱配置:

server:
  port: 8023

#数据源配置
spring:
  datasource:
    url: jdbc:mysql://192.168.56.10:3306/gulimall_ums
    username: root
    password: root
    driver-class-name:  com.mysql.cj.jdbc.Driver
  #配置nacos
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
  #配置服务名称
  application:
    name: provider-and-consumer
  # 配置rabbitMq 服务器
  #spring.application.name=rabbitmq-consumer-true
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    #虚拟host 可以不设置,使用server默认host
    virtual-host: /
    publisher-returns: true  #确认消息已发送到队列(Queue)  这个在生产者模块配置 这个后期再配置,这会还用不到
    publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange) 这个在生产者模块配置 这个后期再配置,这会还用不到
    listener:  #这个在测试消费多个消息的时候,不能有下面这些配置,否则只能消费一个消息后就不继续消费了
      simple:
        acknowledge-mode: manual  #指定MQ消费者的确认模式是手动确认模式  这个在消费者者模块配置  设置手动确认(ack)
        prefetch: 1 #一次只能消费一条消息   这个在消费者者模块配置

  #配置mail
  mail:
    host: smtp.163.com
    username: 15131650119@163.com
    from: 15131650119@163.com
    password: GTMCFUFBTNZERDJA
    default-encoding: UTF-8
    properties:
      mail:
        stmp:
          auth: true
          starttls:
            enable: true
            required: true


#配置日志输出级别
logging:
  level:
    com.atguigu.gulimall: debug   #level 日志等级 指定命名空间的日志输出
  pattern:
    console: "%d %-5level %logger : %msg%n"
    file: "%d %-5level [%thread] %logger : %msg%n"
  file:
    name: d://spring/log



说明: password即授权码, username和from要一致

2、表结构

CREATE TABLE `msg_log` (
  `msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
  `msg` text COMMENT '消息体, json格式化',
  `exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机',
  `routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键',
  `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',
  `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
  `next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`msg_id`),
  UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志';

select * from msg_log t order by t.create_time  desc;



说明: exchange routing_key字段是在定时任务重新投递消息时需要用到的

后面会用到的sql(设置时区使用)

#查询需要定时任务处理的数据
select msg_id, msg, exchange, routing_key, status, try_count,
next_try_time, create_time, update_time,SYSDATE(), now()  from msg_log where status = 0 and next_try_time <= now() 

#设置时区
SELECT @@global.time_zone;
SET GLOBAL time_zone = 'Asia/Shanghai';

3、启动类、服务接口、服务接口实现类

启动类ProviderAndConsumerApplication

package com.atguigu.gulimall.providerconsumer;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableScheduling;


/**
 * MQ消息发送邮件功能实战(博客地址:https://blog.csdn.net/onceing/article/details/126407845)
 */

@EnableScheduling   //设置能使用定时任务
@EnableDiscoveryClient
@SpringBootApplication
@MapperScan("com.atguigu.gulimall.providerconsumer.mapper")
public class ProviderAndConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProviderAndConsumerApplication.class, args);
    }

}

4、TestController 向队列中入消息的入口

	package com.atguigu.gulimall.providerconsumer.controller;

import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.Errors;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 *
 * 测试入库控制器类
 * @author: jd
 * @create: 2024-06-28
 */

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {

    @Autowired
    private TestService testService;


    /**
     * 发送邮件
     * @param mail 邮件对象
     * @param errors JSR303验证结果错误对象  ,(猜测是可以拿到验证的错误信息的用于返回校验的提示)
     * @return
     */
    @PostMapping("/send")
    public ServerResponse sendMail(@RequestBody @Validated Mail mail, Errors errors){
        if(errors.hasErrors()){
            String defaultMessage = errors.getFieldError().getDefaultMessage();
            return ServerResponse.error(defaultMessage);
        }
        return testService.send(mail);
    }

}

5、消息生产接口 TestService.java

package com.atguigu.gulimall.providerconsumer.service;

import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;

/**
 * 消息生产接口
 */
public interface TestService {
    ServerResponse testIdempotence();

    ServerResponse accessLimit();

    ServerResponse send(Mail mail);

}

TestServiceImpl.java

package com.atguigu.gulimall.providerconsumer.service.impl;

import com.atguigu.gulimall.providerconsumer.common.ResponseCode;
import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * 消息生产接口实现类
 * @author: jd
 * @create: 2024-06-27
 */
@Service
@Slf4j
public class TestServiceImpl  implements TestService {

    @Autowired
    private MsgLogMapper msgLogMapper;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public ServerResponse testIdempotence() {
        return ServerResponse.success("testIdempotence: success");
    }

    @Override
    public ServerResponse accessLimit() {
        return ServerResponse.success("accessLimit: success");
    }

    @Override
    public ServerResponse send(Mail mail) {
        // 1. 生产唯一业务标识
        String msgId = String.valueOf(UUID.randomUUID());  //业务的唯一标识
        mail.setMsgId(msgId);

        //2.记录日志
        MsgLog msgLog = new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);
        msgLogMapper.insertMsgLog(msgLog);// 消息入库  先记录日志

        //3.真正发送消息到MQ中
        CorrelationData correlationData = new CorrelationData(msgId);
        rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME,
                MessageHelper.objToMsg(mail), correlationData);// 发送消息

        log.info("====================>消息已发送队列");
        //返回公共的响应结果
        return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());
    }
}

MsgLogMapper.java

package com.atguigu.gulimall.providerconsumer.mapper;

import com.atguigu.gulimall.providerconsumer.batch.BatchProcessMapper;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import org.apache.ibatis.annotations.Mapper;

import java.util.List;

/**
 * 日志操作mapper接口
 */
@Mapper
public interface MsgLogMapper  extends BatchProcessMapper<MsgLog> {

    /**
     * 记录消息日志
     * @param msgLog
     */
    void insertMsgLog(MsgLog msgLog);

    /**
     * 更新消息日志状态
     * @param msgLog
     */
    void updateStatus(MsgLog msgLog);

    /**
     * 查询超时消息
     * @return
     */
    List<MsgLog> selectTimeoutMsg();

    /**
     * 更新尝试的次数
     * @param msgLog
     */
    void updateTryCount(MsgLog msgLog);

    /**
     * 通过主键筛选出消息日志对象
     * @param msgId
     * @return
     */
    MsgLog selectByPrimaryKey(String msgId);

}

MsgLogMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper" >
    <resultMap id="BaseResultMap" type="com.atguigu.gulimall.providerconsumer.pojo.MsgLog" >
        <id column="msg_id" property="msgId" jdbcType="VARCHAR" />
        <result column="msg" property="msg" jdbcType="VARCHAR" />
        <result column="exchange" property="exchange" jdbcType="VARCHAR" />
        <result column="routing_key" property="routingKey" jdbcType="VARCHAR" />
        <result column="status" property="status" jdbcType="INTEGER" />
        <result column="try_count" property="tryCount" jdbcType="INTEGER" />
        <result column="next_try_time" property="nextTryTime" jdbcType="TIMESTAMP" />
        <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
        <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
    </resultMap>

    <sql id="Base_Column_List" >
        msg_id, msg, exchange, routing_key, status, try_count, next_try_time, create_time, update_time
    </sql>


    <insert id="insertMsgLog" parameterType="com.atguigu.gulimall.providerconsumer.pojo.MsgLog">
        INSERT INTO msg_log(msg_id, msg, exchange, routing_key, status, try_count, next_try_time, create_time, update_time)
        VALUES (#{msgId}, #{msg}, #{exchange}, #{routingKey}, #{status}, #{tryCount}, #{nextTryTime}, #{createTime}, #{updateTime})
    </insert>

    <update id="updateStatus" parameterType="com.atguigu.gulimall.providerconsumer.pojo.MsgLog">
        update msg_log set status = #{status}, update_time = now()
        where msg_id = #{msgId}
    </update>

    <select id="selectTimeoutMsg" resultMap="BaseResultMap">
        select <include refid="Base_Column_List"/>
        from msg_log
        where status = 0
        and next_try_time &lt;= now()
    </select>

    <update id="updateTryCount">
        update msg_log set try_count = try_count + 1, next_try_time = #{nextTryTime}, update_time = now()
        where msg_id = #{msgId}
    </update>

    <select id="selectByPrimaryKey" parameterType="java.lang.String" resultMap="BaseResultMap">
        select
        <include refid="Base_Column_List" />
        from msg_log
        where msg_id = #{msgId,jdbcType=VARCHAR}
    </select>
</mapper>

MsgLogService.java

package com.atguigu.gulimall.providerconsumer.service;



import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;

import java.util.Date;
import java.util.List;

/**
 * 日志记录接口类
 */
public interface MsgLogService {

    void updateStatus(String msgId, Integer status);

    MsgLog selectByMsgId(String msgId);

    List<MsgLog> selectTimeoutMsg();

    void updateTryCount(String msgId, Date tryTime);
}

MsgLogServiceImpl.java 消息日志操作实现类

package com.atguigu.gulimall.providerconsumer.service.impl;

import com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import com.atguigu.gulimall.providerconsumer.util.JodaTimeUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;

/**
 * 消息日志操作实现类
 * @author: jd
 * @create: 2024-06-27
 */
@Service
public class MsgLogServiceImpl implements MsgLogService {

    @Autowired
    private MsgLogMapper msgLogMapper;

    @Override
    public void updateStatus(String msgId, Integer status) {
        MsgLog msgLog = new MsgLog();
        msgLog.setMsgId(msgId);
        msgLog.setStatus(status);
        msgLog.setUpdateTime(new Date());
        msgLogMapper.updateStatus(msgLog);
    }

    @Override
    public MsgLog selectByMsgId(String msgId) {
        return msgLogMapper.selectByPrimaryKey(msgId);
    }

    @Override
    public List<MsgLog> selectTimeoutMsg() {
        return msgLogMapper.selectTimeoutMsg();
    }

    @Override
    public void updateTryCount(String msgId, Date tryTime) {
        //获取下一次重发发送时间,上一次发送时间 加一分钟
        Date nextTryTime = JodaTimeUtil.plusMinutes(tryTime, 1);

        //构建消息对象
        MsgLog msgLog = new MsgLog();
        msgLog.setMsgId(msgId);
        msgLog.setNextTryTime(nextTryTime);  //设置下一次消息重发时间

        msgLogMapper.updateTryCount(msgLog);
    }
}

通用BatchProcessMapper.java 所有的mapper可以继承的

package com.atguigu.gulimall.providerconsumer.batch;

import java.util.List;

/**
 * 通用manpper接口
 * @param <T>
 */
public interface BatchProcessMapper<T> {
    void batchInsert(List<T> list);

    void batchUpdate(List<T> list);
}

通用manpper接口实现类 MapperProxy

package com.atguigu.gulimall.providerconsumer.batch.mapperproxy;

import com.atguigu.gulimall.providerconsumer.batch.BatchProcessMapper;
import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;

import java.util.List;

import static com.atguigu.gulimall.providerconsumer.common.Constant.MAX_SIZE_PER_TIME;

/**
 * 通用manpper接口实现类
 * @author: jd
 * @create: 2024-06-27
 */
public class MapperProxy<T> implements BatchProcessMapper<T> {
    private BatchProcessMapper batchProcessMapper;

    public MapperProxy(BatchProcessMapper batchProcessMapper) {
        this.batchProcessMapper = batchProcessMapper;
    }

    @Override
    public void batchInsert(List<T> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }

        List<List<T>> partition = Lists.partition(list, MAX_SIZE_PER_TIME);
        for (List<T> batchList : partition) {
            batchProcessMapper.batchInsert(batchList);
        }
    }

    @Override
    public void batchUpdate(List<T> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }

        List<List<T>> partition = Lists.partition(list, MAX_SIZE_PER_TIME);
        for (List<T> batchList : partition) {
            batchProcessMapper.batchUpdate(batchList);
        }
    }

}

常量类 Constant.java

package com.atguigu.gulimall.providerconsumer.common;

import java.util.Arrays;
import java.util.stream.Collectors;

/**
 * 常量 、枚举类
 * @author: jd
 * @create: 2024-06-27
 */
public class Constant {

    public static final int MAX_SIZE_PER_TIME = 1000;
    public static final int INDEX_ZERO = 0;
    public static final int INDEX_ONE = 1;
    public static final int INDEX_TWO = 2;
    public static final int INDEX_THREE = 3;

    public static final int NUMBER_ZERO = 0;
    public static final int NUMBER_ONE = 1;

    public static final String COLON = ":";
    public static final String COMMA = ",";
    public static final String DOUBLE_STRIGULA = "--";
    public static final String REPLACEMENT_TARGET = "-99999%";

    public static final String UNKNOWN_TYPE = "未知类型";

    public interface Redis {
        String OK = "OK";
        // 过期时间, 60s, 一分钟
        Integer EXPIRE_TIME_MINUTE = 60;
        // 过期时间, 一小时
        Integer EXPIRE_TIME_HOUR = 60 * 60;
        // 过期时间, 一天
        Integer EXPIRE_TIME_DAY = 60 * 60 * 24;
        String TOKEN_PREFIX = "token:";
        String MSG_CONSUMER_PREFIX = "consumer:";
        String ACCESS_LIMIT_PREFIX = "accessLimit:";
        String FUND_RANK = "fundRank";
        String FUND_LIST = "fundList";
    }

    public interface LogType {
        // 登录
        Integer LOGIN = 1;
        // 登出
        Integer LOGOUT = 2;
    }

    /**
     * 相较于生产者对消息的角度来设置的此项枚举值
     */
    public interface MsgLogStatus {
        // 消息投递中
        Integer DELIVERING = 0;
        // 投递成功
        Integer DELIVER_SUCCESS = 1;
        // 投递失败
        Integer DELIVER_FAIL = 2;
        // 已消费
        Integer CONSUMED_SUCCESS = 3;
    }

    public enum CalculateTypeEnum {
        ADD(1, "加"),
        SUBTRACT(2, "减"),
        MULTIPLY(3, "乘"),
        DIVIDE(4, "除")
        ;

        Integer type;
        String desc;

        CalculateTypeEnum(Integer type, String desc) {
            this.type = type;
            this.desc = desc;
        }

        public Integer getType() {
            return type;
        }

        public String getDesc() {
            return desc;
        }
    }

    public enum FundSortType {
        ASC("asc"),
        DESC("desc"),
        ;

        private String type;

        FundSortType(String type) {
            this.type = type;
        }

        public String getType() {
            return type;
        }
    }
}

公共服务响应包装类【这个一般的项目中都会用到这个公共的封装】ServerResponse.java

package com.atguigu.gulimall.providerconsumer.common;

import com.fasterxml.jackson.annotation.JsonIgnore;
import jdk.nashorn.internal.ir.annotations.Ignore;

import java.io.Serializable;

/**
 * 公共服务响应包装类【这个一般的项目中都会用到这个公共的封装】
 * @author: jd
 * @create: 2024-06-27
 */
public class ServerResponse  implements Serializable {

    private static final long serialVersionUID = 7498483649536881777L;

    private Integer status;

    private String msg;

    private Object data;

    public ServerResponse() {
    }

    public ServerResponse(Integer status, String msg, Object data) {
        this.status = status;
        this.msg = msg;
        this.data = data;
    }

    /**
     * @JsonIgnore注解在Java中主要用于处理JSON序列化和反序列化过程,其具体作用如下:
     *
     * 忽略属性:当在Java对象的某个属性或方法上使用@JsonIgnore注解时,该属性或方法对应的属性在序列化为JSON字符串时会被忽略,同样地,在将JSON字符串反序列化为Java对象时,该属性或方法对应的属性也不会被解析。
     * 当用在属性上时:表示忽略该属性的序列化和反序列化。
     * 当用在方法上时:表示忽略该方法对应的属性的序列化和反序列化。
     * 保护敏感信息:在实际应用中,@JsonIgnore注解可以用于隐藏一些敏感信息,比如密码、token等,确保这些信息不会被发送到客户端或存储在不安全的地方。
     * 减少数据大小:通过忽略一些不必要的属性,可以减少序列化后的JSON数据大小,提高数据传输效率。
     * 解决循环引用问题:当对象之间存在循环引用时,使用@JsonIgnore注解可以避免在序列化过程中出现无限递归的情况。
     * 提高程序的可维护性和安全性:通过精确控制哪些属性参与序列化和反序列化,可以使得程序更加健壮,减少潜在的安全风险。
     * 需要注意的是,@JsonIgnore注解是Jackson库提供的,因此需要确保项目中引入了Jackson库的相关依赖。同时,在使用@JsonIgnore注解时要确保被标记的属性或方法确实不需要参与序列化和反序列化,否则可能会导致意外的结果。
     *
     * 总之,@JsonIgnore注解在Java对象和JSON之间的转换过程中起到了非常重要的作用,能够帮助我们更灵活地控制序列化和反序列化的行为。
     * @return
     */
    @JsonIgnore
    public boolean isSuccess() {
        return this.status == ResponseCode.SUCCESS.getCode();
    }


    public static ServerResponse success() {
        return new ServerResponse(ResponseCode.SUCCESS.getCode(), null, null);
    }

    public static ServerResponse success(String msg) {
        return new ServerResponse(ResponseCode.SUCCESS.getCode(), msg, null);
    }

    public static ServerResponse success(Object data) {
        return new ServerResponse(ResponseCode.SUCCESS.getCode(), null, data);
    }

    public static ServerResponse success(String msg, Object data) {
        return new ServerResponse(ResponseCode.SUCCESS.getCode(), msg, data);
    }

    public static ServerResponse error(String msg) {
        return new ServerResponse(ResponseCode.ERROR.getCode(), msg, null);
    }

    public static ServerResponse error(Object data) {
        return new ServerResponse(ResponseCode.ERROR.getCode(), null, data);
    }

    public static ServerResponse error(String msg, Object data) {
        return new ServerResponse(ResponseCode.ERROR.getCode(), msg, data);
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }
}

服务响应状态码 大部分的服务中都会用到这个公共的状态码类 ResponseCode.java

package com.atguigu.gulimall.providerconsumer.common;

/**
 * 服务响应状态码  大部分的服务中都会用到这个公共的状态码类
 */
public enum ResponseCode {
    // 系统模块
    SUCCESS(0, "操作成功"),
    ERROR(1, "操作失败"),
    SERVER_ERROR(500, "服务器异常"),

    // 通用模块 1xxxx
    ILLEGAL_ARGUMENT(10000, "参数不合法"),
    REPETITIVE_OPERATION(10001, "请勿重复操作"),
    ACCESS_LIMIT(10002, "请求太频繁, 请稍后再试"),
    MAIL_SEND_SUCCESS(10003, "邮件发送成功"),

    // 用户模块 2xxxx
    NEED_LOGIN(20001, "登录失效"),
    USERNAME_OR_PASSWORD_EMPTY(20002, "用户名或密码不能为空"),
    USERNAME_OR_PASSWORD_WRONG(20003, "用户名或密码错误"),
    USER_NOT_EXISTS(20004, "用户不存在"),
    WRONG_PASSWORD(20005, "密码错误"),
    ;


    private Integer code;

    private String msg;

    ResponseCode(Integer code, String msg) {
        this.code = code;
        this.msg = msg;
    }

    public Integer getCode() {
        return code;
    }

    public void setCode(Integer code) {
        this.code = code;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

4、工具类

时间字符操作类 JodaTimeUtil.java

package com.atguigu.gulimall.providerconsumer.util;

import com.alibaba.cloud.commons.lang.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import java.util.Date;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/**
 * 时间字符操作类 JodaTimeUtil
 * @author: jd
 * @create: 2024-06-27
 */
@Slf4j
public class JodaTimeUtil {


    private static final String STANDARD_FORMAT = "yyyy-MM-dd HH:mm:ss";

    /**
     * date类型 -> string类型
     *
     * @param date
     * @return
     */
    public static String dateToStr(Date date) {
        return dateToStr(date, STANDARD_FORMAT);
    }

    /**
     * date类型 -> string类型
     *
     * @param date
     * @param format 自定义日期格式
     * @return
     */
    public static String dateToStr(Date date, String format) {
        if (date == null) {
            return null;
        }

        format = StringUtils.isBlank(format) ? STANDARD_FORMAT : format;
        DateTime dateTime = new DateTime(date);
        return dateTime.toString(format);
    }

    /**
     * string类型 -> date类型
     *
     * @param timeStr
     * @return
     */
    public static Date strToDate(String timeStr) {
        return strToDate(timeStr, STANDARD_FORMAT);
    }

    /**
     * string类型 -> date类型
     *
     * @param timeStr
     * @param format  自定义日期格式
     * @return
     */
    public static Date strToDate(String timeStr, String format) {
        if (StringUtils.isBlank(timeStr)) {
            return null;
        }

        format = StringUtils.isBlank(format) ? STANDARD_FORMAT : format;

        DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern(format);
        DateTime dateTime;
        try {
            dateTime = dateTimeFormatter.parseDateTime(timeStr);
        } catch (Exception e) {
            log.error("strToDate error: timeStr: {}", timeStr, e);
            return null;
        }

        return dateTime.toDate();
    }

    /**
     * 判断date日期是否过期(与当前时刻比较)
     *
     * @param date
     * @return
     */
    public static Boolean isTimeExpired(Date date) {
        String timeStr = dateToStr(date);
        return isBeforeNow(timeStr);
    }

    /**
     * 判断date日期是否过期(与当前时刻比较)
     *
     * @param timeStr
     * @return
     */
    public static Boolean isTimeExpired(String timeStr) {
        if (StringUtils.isBlank(timeStr)) {
            return true;
        }

        return isBeforeNow(timeStr);
    }

    /**
     * 判断timeStr是否在当前时刻之前
     *
     * @param timeStr
     * @return
     */
    private static Boolean isBeforeNow(String timeStr) {
        DateTimeFormatter format = DateTimeFormat.forPattern(STANDARD_FORMAT);
        DateTime dateTime;
        try {
            dateTime = DateTime.parse(timeStr, format);
        } catch (Exception e) {
            log.error("isBeforeNow error: timeStr: {}", timeStr, e);
            return null;
        }
        return dateTime.isBeforeNow();
    }

    /**
     * 日期加天数
     *
     * @param date
     * @param days
     * @return
     */
    public static Date plusDays(Date date, int days) {
        return plusOrMinusDays(date, days, 0);
    }

    /**
     * 日期减天数
     *
     * @param date
     * @param days
     * @return
     */
    public static Date minusDays(Date date, int days) {
        return plusOrMinusDays(date, days, 1);
    }

    /**
     * 加减天数
     *
     * @param date
     * @param days
     * @param type 0:加天数 1:减天数
     * @return
     */
    private static Date plusOrMinusDays(Date date, int days, Integer type) {
        if (null == date) {
            return null;
        }

        DateTime dateTime = new DateTime(date);
        if (type == 0) {
            dateTime = dateTime.plusDays(days);
        } else {
            dateTime = dateTime.minusDays(days);
        }

        return dateTime.toDate();
    }

    /**
     * 日期加分钟
     *
     * @param date
     * @param minutes
     * @return
     */
    public static Date plusMinutes(Date date, int minutes) {
        return plusOrMinusMinutes(date, minutes, 0);
    }

    /**
     * 日期减分钟
     *
     * @param date
     * @param minutes
     * @return
     */
    public static Date minusMinutes(Date date, int minutes) {
        return plusOrMinusMinutes(date, minutes, 1);
    }

    /**
     * 加减分钟
     *
     * @param date
     * @param minutes
     * @param type    0:加分钟 1:减分钟
     * @return
     */
    private static Date plusOrMinusMinutes(Date date, int minutes, Integer type) {
        if (null == date) {
            return null;
        }

        DateTime dateTime = new DateTime(date);
        if (type == 0) {
            dateTime = dateTime.plusMinutes(minutes);
        } else {
            dateTime = dateTime.minusMinutes(minutes);
        }

        return dateTime.toDate();
    }

    /**
     * 日期加月份
     *
     * @param date
     * @param months
     * @return
     */
    public static Date plusMonths(Date date, int months) {
        return plusOrMinusMonths(date, months, 0);
    }

    /**
     * 日期减月份
     *
     * @param date
     * @param months
     * @return
     */
    public static Date minusMonths(Date date, int months) {
        return plusOrMinusMonths(date, months, 1);
    }

    /**
     * 加减月份
     *
     * @param date
     * @param months
     * @param type   0:加月份 1:减月份
     * @return
     */
    private static Date plusOrMinusMonths(Date date, int months, Integer type) {
        if (null == date) {
            return null;
        }

        DateTime dateTime = new DateTime(date);
        if (type == 0) {
            dateTime = dateTime.plusMonths(months);
        } else {
            dateTime = dateTime.minusMonths(months);
        }

        return dateTime.toDate();
    }

    /**
     * 判断target是否在开始和结束时间之间
     *
     * @param target
     * @param startTime
     * @param endTime
     * @return
     */
    public static Boolean isBetweenStartAndEndTime(Date target, Date startTime, Date endTime) {
        if (null == target || null == startTime || null == endTime) {
            return false;
        }

        DateTime dateTime = new DateTime(target);
        return dateTime.isAfter(startTime.getTime()) && dateTime.isBefore(endTime.getTime());
    }
}

Object 和String互转类 JsonUtil

package com.atguigu.gulimall.providerconsumer.util;

import com.alibaba.cloud.commons.lang.StringUtils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import lombok.extern.slf4j.Slf4j;

import java.text.SimpleDateFormat;

/**
 * Object 和String互转类
 * @author: jd
 * @create: 2024-06-27
 */
@Slf4j
public class JsonUtil {
    private static ObjectMapper objectMapper = new ObjectMapper();
    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

    static {
        // 对象的所有字段全部列入
        objectMapper.setSerializationInclusion(JsonInclude.Include.ALWAYS);
        // 取消默认转换timestamps形式
        objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
        // 忽略空bean转json的错误
        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        // 统一日期格式
        objectMapper.setDateFormat(new SimpleDateFormat(DATE_FORMAT));
        // 忽略在json字符串中存在, 但在java对象中不存在对应属性的情况, 防止错误
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }

    /**
     * 将Object转化为String对象
     * @param obj
     * @param <T>
     * @return
     */
    public static <T> String objToStr(T obj) {
        if (null == obj) {
            return null;
        }

        try {
            return obj instanceof String ? (String) obj : objectMapper.writeValueAsString(obj);
        } catch (Exception e) {
            log.warn("objToStr error: ", e);
            return null;
        }
    }

    /**
     * 将字符串转化成Object对象
     * @param str   待转的字符串
     * @param clazz 类名
     * @param <T>
     * @return
     */
    public static <T> T strToObj(String str, Class<T> clazz) {
        if (StringUtils.isBlank(str) || null == clazz) {
            return null;
        }

        try {
            return clazz.equals(String.class) ? (T) str : objectMapper.readValue(str, clazz);
        } catch (Exception e) {
            log.warn("strToObj error: ", e);
            return null;
        }
    }

    public static <T> T strToObj(String str, TypeReference<T> typeReference) {
        if (StringUtils.isBlank(str) || null == typeReference) {
            return null;
        }

        try {
            return (T) (typeReference.getType().equals(String.class) ? str : objectMapper.readValue(str, typeReference));
        } catch (Exception e) {
            log.error("strToObj error", e);
            return null;
        }
    }
}

发送邮件工具类 MailUtil.java

package com.atguigu.gulimall.providerconsumer.util;

import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.MailException;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;

import javax.mail.internet.AddressException;
import javax.mail.internet.InternetAddress;

/**
 *
 * 发送邮件工具类
 * @author: jd
 * @create: 2024-06-27
 */

@Component
@Slf4j
public class MailUtil {

    @Value("${spring.mail.from}")    //这里从application.xml中拿不到配置信息,所以从这里直接写死了
    private String from ="15131650119@163.com";

    @Autowired
    private JavaMailSender mailSender;


    public boolean send(Mail mail) throws AddressException {
        //模拟消费成功,但是业务实际没成功,此时会重新入队列,不会造成消息丢失
//        if(true){
//            return false;
//        }
        String to = mail.getTo();// 目标邮箱
        String title = mail.getTitle();// 邮件标题
        String content = mail.getContent();// 邮件正文

        SimpleMailMessage message = new SimpleMailMessage();
        message.setFrom(String.valueOf(new InternetAddress(from)));  //设置发送人
        message.setTo(to);  //设置目标账户
        message.setSubject(title); //设置邮件标题
        message.setText(content);  //设置邮件内容


        try {
            log.info("===================>开始发送邮件");
            mailSender.send(message);
            log.info("===================>邮件发送成功");
            return true;
        } catch (MailException e) {
            log.error("=============>邮件发送失败, to: {}, title: {}", to, title, e);
            return false;
        }


    }


}

SpringBeanUtil.java 获取BeanSpring容器类

package com.atguigu.gulimall.providerconsumer.util;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @author: jd
 * @create: 2024-06-27
 */
@Component
public class SpringBeanUtil implements ApplicationContextAware {


    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        SpringBeanUtil.applicationContext = applicationContext;
    }

    /**
     * 通过名称在spring容器中获取对象
     *
     * @param beanName
     * @return
     */
    public static Object getBean(String beanName) {
        System.out.println(applicationContext);
        return applicationContext.getBean(beanName);
    }

}

5、RabbitMQ消费者、生产者配置类

A、MQ生产者:

TestController.java

package com.atguigu.gulimall.providerconsumer.service.impl;

import com.atguigu.gulimall.providerconsumer.common.ResponseCode;
import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * 消息生产接口实现类
 * @author: jd
 * @create: 2024-06-27
 */
@Service
@Slf4j
public class TestServiceImpl  implements TestService {

    @Autowired
    private MsgLogMapper msgLogMapper;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public ServerResponse testIdempotence() {
        return ServerResponse.success("testIdempotence: success");
    }

    @Override
    public ServerResponse accessLimit() {
        return ServerResponse.success("accessLimit: success");
    }

    @Override
    public ServerResponse send(Mail mail) {
        // 1. 生产唯一业务标识
        String msgId = String.valueOf(UUID.randomUUID());  //业务的唯一标识
        mail.setMsgId(msgId);

        //2.记录日志
        MsgLog msgLog = new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);
        msgLogMapper.insertMsgLog(msgLog);// 消息入库  先记录日志

        //3.真正发送消息到MQ中
        CorrelationData correlationData = new CorrelationData(msgId);
        rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME,
                MessageHelper.objToMsg(mail), correlationData);// 发送消息

        log.info("====================>消息已发送队列");
        //返回公共的响应结果
        return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());
    }
}

队列 交换机配置,用于消息生产者:RabbitConfig.java

package com.atguigu.gulimall.providerconsumer.config;

import com.atguigu.gulimall.providerconsumer.common.Constant;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

/**
 *
 * 队列 交换机配置,用于消息生产者
 * @author: jd
 * @create: 2024-06-27
 */

@Slf4j
@Component
@Configuration
public class RabbitConfig {

    @Autowired
    private MsgLogService msgLogService;

    // 发送邮件
    public static final String MAIL_QUEUE_NAME = "mail.queue";
    public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
    public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";

    @Bean
    public Queue mailQueue() {
        return new Queue(MAIL_QUEUE_NAME, true);
    }

    @Bean
    public DirectExchange mailExchange() {
        return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);
    }

    @Bean
    public Binding mailBinding() {
        return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
    }

//    @Autowired
//    private CachingConnectionFactory connectionFactory;

//    ConnectionFactory connectionFactory = (ConnectionFactory) SpringBeanUtil.getBean("connectionFactory");

    /**
     * 设置生产者消息确认回调函数
     *
     */
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory  connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setMessageConverter(converter());
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    log.info("消息成功发送到Exchange");
                    String msgId = correlationData.getId();
                    msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);
                } else {
                    log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
                }
                System.out.println("ConfirmCallback回调:     "+"相关数据:"+correlationData);
                System.out.println("ConfirmCallback回调:     "+"确认情况:"+ack);
                System.out.println("ConfirmCallback回调:     "+"原因:"+cause);
            }
        });

        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {

                System.out.println("ReturnCallback回调:     "+"消息:"+returnedMessage.getMessage());
                System.out.println("ReturnCallback回调:     "+"回应码:"+returnedMessage.getReplyCode());
                System.out.println("ReturnCallback回调:     "+"回应信息:"+returnedMessage.getReplyText());
                System.out.println("ReturnCallback回调:     "+"交换机:"+returnedMessage.getExchange());
                System.out.println("ReturnCallback回调:     "+"路由键:"+returnedMessage.getRoutingKey());
                log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}",
                        returnedMessage.getExchange(),
                        returnedMessage.getRoutingKey(),
                        returnedMessage.getReplyCode(),
                        returnedMessage.getReplyText(),
                        returnedMessage.getMessage());

            }
        });

        return rabbitTemplate;
    }


    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }


}

B、MQ 消费者 其实就完成了3件事: 1.保证消费幂等性, 2.发送邮件, 3.更新消息状态, 手动ack
package com.atguigu.gulimall.providerconsumer.mq.consumer;

import com.atguigu.gulimall.providerconsumer.common.Constant;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import com.atguigu.gulimall.providerconsumer.util.JsonUtil;
import com.atguigu.gulimall.providerconsumer.util.MailUtil;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.mail.internet.AddressException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;

/**
 * MQ 监听者,操作业务(发送邮件)
 * 其实就完成了3件事:
 *      1.保证消费幂等性, 2.发送邮件, 3.更新消息状态, 手动ack
 * @author: jd
 * @create: 2024-06-27
 */
@Component
@Slf4j
@RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)  //指定监听队列
public class MailConsumer {

    @Autowired
     private MsgLogService msgLogService;

    @Autowired
    private MailUtil mailUtil;

    @RabbitHandler(isDefault = true)   //指定监听后的处理动作
    public void consume(Message message, Channel channel) throws IOException, AddressException {
        //将Message中的业务数据转化成Mail对象
        Mail mail = MessageHelper.msgToObj(message, Mail.class);
        log.info("================>消费者收到消息: {}", mail.toString());
        log.debug("=========测试debug和info有什么区别======");
        //根据ID查询Msg对象
        String msgId = mail.getMsgId();
        MsgLog msgLog = msgLogService.selectByMsgId(msgId);
        // 消费幂等性
        if (null == msgLog || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)) {
            log.info("===========>消费者重复消费,此时不进行消费 ,msgId: {}", msgId);
            //直接终止程序运行,程序返回
            return;
        }

        //拿到MQ中的每一条消息的唯一标识Tag
        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();

        //业务操作:发送邮件
        log.info("================>准备发送邮件");
        boolean send = mailUtil.send(mail);
//
        try {
            //如果发送邮件成功,则修改消息状态为 已消费
            if(send){
                //发送成功后更新消息日志表的消息记录状态
                msgLogService.updateStatus(msgId, Constant.MsgLogStatus.CONSUMED_SUCCESS);
                //取得进程ID
                Thread t = Thread.currentThread();
                log.info("【消息队列】current request consumer success, request info: {}; thread info: {};", JsonUtil.objToStr(mail), t);
                // 消费确认,设置反馈给MQ
                channel.basicAck(tag, false);
            }else {
                log.error("【消息队列】consumer failed,, msg info: {}", JsonUtil.objToStr(mail));
                channel.basicNack(tag, false, true);  //这样会告诉rabbitmq该消息消费失败, 需要重新入队, 可以重新投递到其他正常的消费端进行消费, 从而保证消息不被丢失
            }
        } catch (Exception e) {
            //产生异常之后,则不消费,直接拒绝此消息,不进行消费;这样会导致这条失败的消息会一直存在队列里面,然后定时任务过一会在数据库中扫到这个信息之后,会再去MQ中拿这个消息进行消费
            e.printStackTrace();
            ByteArrayOutputStream bass = new ByteArrayOutputStream();
            e.printStackTrace(new PrintStream(bass));
            log.error("【消息队列】consumer error, error info: {}, msg info: {}", bass, JsonUtil.objToStr(mail));
            channel.basicNack(tag, false, true);
        }


    }

}

6、定时任务重发: ResendMsg.java (说明: 每一条消息都和exchange routingKey绑定, 所有消息重投共用这一个定时任务即可)

package com.atguigu.gulimall.providerconsumer.task;

import com.atguigu.gulimall.providerconsumer.common.Constant;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 消息重发定时任务
 * @author: jd
 * @create: 2024-06-28
 */
@Component
@Slf4j
public class ResendMsg {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 最大投递次数。第四次投递失败
    private static final int MAX_TRY_COUNT = 3;

    @Autowired
    private MsgLogService msgLogService;

    /**
     * 每30s拉取投递失败的消息, 重新投递
     */
    @Scheduled(cron = "0/30 * * * * ?")
    public void reSend(){
        log.info("开始执行定时任务(重新投递消息)");
        List<MsgLog> msgLogs = msgLogService.selectTimeoutMsg();  //查询还在投递中的消息
        msgLogs.forEach(msgLog->{
            String msgId = msgLog.getMsgId();
            //超过投递次数则不会重新投递中的消息是否需要投递
            if(msgLog.getTryCount()>=MAX_TRY_COUNT){
                //不需要重新投递
                msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
                log.info("消息ID {}超过最大的投递次数 {} 次,投递失败,需要人工查看!",msgId,MAX_TRY_COUNT);
            }else {
                //拿到消息在表中的本次重试时间,去获取下一次重试时间  同时 投递次数+1
                msgLogService.updateTryCount(msgId,msgLog.getNextTryTime());
                CorrelationData correlationData = new CorrelationData(msgId);//携带业务信息,作为业务的唯一标识
                //重新发送消息到MQ,让MQ去重新尝试消费这一条之前没有发送到MQ的消息(因为我们现在查的消息的状态是status =0 的代表是消息还是投递中的,没有变成投递成功的消息,肯定是投递有问题)
                rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME,
                        RabbitConfig.MAIL_ROUTING_KEY_NAME,  //每一条消息都和exchange routingKey绑定, 所有消息重投共用这一个定时任务即可
                        MessageHelper.objToMsg(msgLog),
                        correlationData);
                log.info("第 " + (msgLog.getTryCount() + 1) + " 次重新投递消息");
            }
        });
        log.info("定时任务执行结束(重新投递消息)");  //

    }


}

四、基本测试

OK, 目前为止, 代码准备就绪, 现在进行正常流程的测试 1.发送请求:
在这里插入图片描述
后台日志:
在这里插入图片描述
3.库消息记录:
在这里插入图片描述
状态为3, 表明已消费, 消息重试次数为0, 表明一次投递就成功了,此时就可以到目标邮箱中去查看是否接收到了这个邮件

五、异常情况测试

1.验证消息发送到Exchange失败情况下的回调, 对应上图P -> X

如何验证? 可以随便指定一个不存在的交换机名称, 请求接口, 看是否会触发回调
在这里插入图片描述
发送失败, 原因: reply-code=404, reply-text=NOT_FOUND - no exchange ‘mail.exchangeabcd’ in vhost ‘/’, 该回调能够保证消息正确发送到Exchange, 测试完成

2.验证消息从Exchange路由到Queue失败情况下的回调, 对应上图X -> Q 同理, 修改一下路由键为不存在的即可, 路由失败, 触发回调
在这里插入图片描述
发送失败, 原因: route: mail.routing.keyabcd, replyCode: 312, replyText: NO_ROUTE

3.验证在手动ack模式下, 消费端必须进行手动确认(ack), 否则消息会一直保存在队列中, 直到被消费, 对应上图Q -> C 将消费端代码channel.basicAck(tag, false);// 消费确认注释掉, 查看控制台和rabbitmq管控台
在这里插入图片描述
在这里插入图片描述
可以看到, 虽然消息确实被消费了, 但是由于是手动确认模式, 而最后又没手动确认, 所以, 消息仍被rabbitmq保存, 所以, 手动ack能够保证消息一定被消费, 但一定要记得basicAck

4.验证消费端幂等性 接着上一步, 去掉注释, 重启服务器, 由于有一条未被ack的消息, 所以重启后监听到消息, 进行消费, 但是由于消费前会判断该消息的状态是否未被消费, 发现status=3, 即已消费, 所以, 直接return, 这样就保证了消费端的幂等性, 即使由于网络等原因投递成功而未触发回调, 从而多次投递, 也不会重复消费进而发生业务异常
在这里插入图片描述

5.验证消费端发生异常消息也不会丢失 很显然, 消费端代码可能发生异常, 如果不做处理, 业务没正确执行, 消息却不见了, 给我们感觉就是消息丢失了, 由于我们消费端代码做了异常捕获, 业务异常时, 会触发: channel.basicNack(tag, false, true);, 这样会告诉rabbitmq该消息消费失败, 需要重新入队, 可以重新投递到其他正常的消费端进行消费, 从而保证消息不被丢失 测试: send方法直接返回false即可(这里跟抛出异常一个意思),因为我们向MQ插入了消息,但是实际业务消费了,但是发送邮件返回了false,这样会从新投递到MQ队列中,再进行消费,一直重复。
代码修改:
在这里插入图片描述
结果:
在这里插入图片描述

可以看到, 由于channel.basicNack(tag, false, true), 未被ack的消息(unacked)会重新入队并被消费, 这样就保证了消息不会走丢

6.验证定时任务的消息重投 实际应用场景中, 可能由于网络原因, 或者消息未被持久化MQ就宕机了, 使得投递确认的回调方法ConfirmCallback没有被执行, 从而导致数据库该消息状态一直是投递中的状态, 此时就需要进行消息重投, 即使也许消息已经被消费了 定时任务只是保证消息100%投递成功, 而多次投递的消费幂等性需要消费端自己保证 我们可以将回调和消费成功后更新消息状态的代码注释掉, 开启定时任务, 查看是否重投

这是没有异常信息的情况下,定时任务每次都不会做实际的业务:
在这里插入图片描述
当我们对一条消息,进行了实际的业务处理,而且也业务处理成功了,只是没有把状态修改成成功,这样定时任务会扫,重新入队列,但是有幂等性校验,所以一直发送到队列将这条信息,直到3次后,消息会被更新为发送失败
在这里插入图片描述

在这里插入图片描述

发送邮件其实很简单, 但深究起来其实有很多需要注意和完善的点, 一个看似很小的知识点, 也可以引申出很多问题, 甚至涉及到方方面面, 这些都需要自己踩坑, 当然我这代码肯定还有很多不完善和需要优化的点, 希望小伙伴多多提意见和建议 我的代码都是经过自测验证过的, 图也都是一点一点自己画的或认真截的, 希望小伙伴能学到一点东西, 路过的点个赞或点个关注呗, 谢谢

部分参考:springboot + rabbitmq发送邮件实战(保证消息100%投递成功并被消费)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1887485.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux内核驱动第一课(基于RK3568)

学习Linux驱动需要以下基础知识&#xff1a; C语言编程&#xff1a;掌握C语言是开发Linux驱动程序的基本要求。操作系统原理&#xff1a;了解操作系统的基本概念和原理&#xff0c;如进程管理、内存管理、中断处理等。Linux内核&#xff1a;熟悉Linux内核的结构和工作机制&…

DreamTech联合南大和牛津发布最强3D内容生成大模型——Direct3D

文章链接&#xff1a;https://arxiv.org/pdf/2405.14832 github链接&#xff1a;https://nju-3dv.github.io/projects/Direct3D/ 从文本和图像生成高质量的3D资产一直是一项挑战&#xff0c;主要是由于缺乏能够捕捉复杂几何分布的可扩展3D表示。在这项工作中&#xff0c;介绍…

力扣61. 旋转链表(java)

思路&#xff1a;用快慢指针找到最后链表k个需要移动的节点&#xff0c;然后中间断开节点&#xff0c;原尾节点连接原头节点&#xff0c;返回新的节点即可&#xff1b; 但因为k可能比节点数大&#xff0c;所以需要先统计节点个数&#xff0c;再取模&#xff0c;看看k到底需要移…

网络爬虫基础知识

文章目录 网络爬虫基础知识爬虫的定义爬虫的工作流程常用技术和工具爬虫的应用1. 抓取天气信息2. 抓取新闻标题3. 抓取股票价格4. 抓取商品价格5. 抓取博客文章标题 网络爬虫基础知识 爬虫的定义 网络爬虫&#xff08;Web Crawler 或 Spider&#xff09;是一种自动化程序&…

gitee项目上不同的项目分别使用不用的用户上传

最近使用根据需要&#xff0c;希望不同的项目使用不同的用户上传&#xff0c;让不同的仓库展示不同的用户名&#xff01;&#xff01;&#xff01; 第一步查看全局的用户信息&#xff1a; # 查看目前全局git配置信息 git config -l #会输出全局的git配置信息 第二步进入到要设…

【java计算机毕设】高校学生管理系统MySQL springboot vue3 Maven 源码 代码

目录 1项目功能 2项目介绍 3项目地址 1项目功能 【java计算机毕设】高校学生管理系统MySQL springboot vue3 Maven 小组项目设计代码源码 2项目介绍 系统功能&#xff1a; 高校学生管理系统主要功能包含&#xff1a;学生管理&#xff0c;班主任信息管理&#xff0c;家长信息…

仓库管理系统26--权限设置

原创不易&#xff0c;打字不易&#xff0c;截图不易&#xff0c;多多点赞&#xff0c;送人玫瑰&#xff0c;留有余香&#xff0c;财务自由明日实现 1、权限概述 在应用软件中&#xff0c;通常将软件的功能分为若干个子程序&#xff0c;通过主程序调用。那么&#xff0c;通过…

Python的matplotlib简单操作及图像闪屏问题

1.显示一个sinx的图像 import matplotlib.pyplot as plt import numpy as np xnp.linspace(0,10,100)#生成0到10 之间 分成100份等间隔 ynp.sin(x) # # plt.plot(x,y)#放入x与y plt.title("ysin(x)")#给图像命名 plt.xlabel("x")#设置x位置的名字 plt.yl…

HarmonyOS开发实战:UDP通讯示例规范

1. UDP简介 UDP协议是传输层协议的一种&#xff0c;它不需要建立连接&#xff0c;是不可靠、无序的&#xff0c;相对于TCP协议报文更简单&#xff0c;在特定场景下有更高的数据传输效率&#xff0c;在现代的网络通讯中有广泛的应用&#xff0c;以最新的HTTP/3为例&#xff0c;…

无需修改代码,深入探究 pytest 如何自动查找并加载三方插件

相信测试的同学或者python开发同学&#xff0c;都知道pytest框架&#xff0c;pytest不仅是一个功能强大的测试框架&#xff0c;同时还是一个插件化的测试平台。 插件只需配置就可以直接使用&#xff0c;而不需要测试代码配合。如果安装了插件&#xff0c;pytest则可以自动查找…

基于python的随机森林回归预测+贝叶斯优化超参数前后训练效果对比

目录 1.导入必要的库 2.导入数据与数据预处理 3.查看数据分布 4.特征选择 5.模型建立与训练 6.训练集预测结果 7.模型评估 8.预测新数据 9.贝叶斯优化超参数 1.导入必要的库 # 导入所需的库 from sklearn.model_selection import cross_val_score import pandas as …

【聊聊原子性,中断,以及nodejs中的具体示例】

什么是原子性 从一个例子说起&#xff0c; x &#xff0c;读和写 &#xff0c; 如图假设多线程&#xff0c;线程1和线程2同时操作变量x&#xff0c;进行x的操作&#xff0c;那么由于写的过程中&#xff0c;都会先读一份x数据到cpu的寄存器中&#xff0c;所以这个时候cpu1 和 c…

Michael.W基于Foundry精读Openzeppelin第61期——ERC1967Upgrade.sol

Michael.W基于Foundry精读Openzeppelin第61期——ERC1967Upgrade.sol 0. 版本0.1 ERC1967Upgrade.sol 1. 目标合约2. 代码精读2.1 _getImplementation() internal && _upgradeTo(address newImplementation) internal2.2 _upgradeToAndCall(address newImplementation,…

11--ElasticStack7-ELK+Kafka

前言&#xff1a;日志分析管理平台对于平时的规模化运维占的权重非常大&#xff0c;这一章涉及的程序较多&#xff0c;会将每个程序的基础使用和模块分开梳理&#xff0c;基础概念会分布在每小节开头&#xff0c;最后串联成一个完整的工作环境。 1、ELK架构 ELK 是一个非常流…

基于机器学习的永磁同步电机矢量控制策略-高分资源-下载可用!

基于机器学习的永磁同步电机矢量控制策略 优势 训练了RL-Agent&#xff0c;能够提高电机在非线性负载下的性能。 部分程序 仿真结果 转矩估计及dq轴电流。 代码有偿&#xff0c;50&#xff0c;需要的可以联系。

【STM32HAL库学习】通信方式:USART、IIC、SPI

通信的目的&#xff1a;将一个设备的数据传送到另一个设备&#xff0c;扩展硬件系统 通信接口区别 名称引脚双工时钟电平设备USARTTX、RX全双工异步单端点对点I2CSCL、SDA半双工同步单端多设备SPISCLK、MOSI、MISO、CS全双工同步单端多设备CANCAN_H、CAN_L半双工异步差分多设…

【antd + vue】表格行合并,同时使用插槽

一、需求说明 表格中&#xff0c;如果一个学校有多个考试科目&#xff0c;则分行展示&#xff0c;其余列&#xff0c;则合并为一行展示&#xff0c;如图所示 二、需求分析 1、表格行合并 相当于有4行&#xff0c;其中1、2行是同一个学校包含不同考试科目及对应人次的数据&am…

COB封装的LED显示屏是什么?

COB&#xff08;Chip on Board&#xff09;封装的LED显示屏&#xff0c;是一种采用先进倒装COB封装技术的显示屏&#xff0c;其中LED芯片是直接被安装并封装在PCB电路板上&#xff0c;而不是先对单个封装再焊接至电路板&#xff0c;与SMD&#xff08;Surface Mount Device&…

Java知识点整理 18 — Lambda表达式

一. 简介 Lambda 表达式是函数式编程思想的体现&#xff0c;强调做什么&#xff0c;而不是以什么方式去做。 面向对象编程思想强调的是对象&#xff0c;必须通过对象的形式来做一些事情。比如多线程执行任务&#xff0c;需要创建对象&#xff0c;对象需要实现指定接口&#x…

【吴恩达机器学习-week2】可选实验:使用 Scikit-Learn 进行线性回归

支持我的工作 &#x1f389; &#x1f4c3;亲爱的朋友们&#xff0c;感谢你们一直以来对我的关注和支持&#xff01; &#x1f4aa;&#x1f3fb; 为了提供更优质的内容和更有趣的创作&#xff0c;我付出了大量的时间和精力。如果你觉得我的内容对你有帮助或带来了欢乐&#xf…