RabbitMQ可靠性消息发送(java实现)

news2025/1/1 11:19:07

本博客属于 《RabbitMQ基础组件封装—整体结构》的子博客

一、整体架构

step1:消息落库,业务数据存库的同时,也要将消息记录存入数据库,二者要保证原子性;

step2:Producer发送消息到MQ Broker;

step3:Producer收到 broker 返回的确认消息;

step4:更改消息记录库的状态(定义三种状态:0待确认、1已确认、2确认失败);

step5:定时任务获取长时间处于待确认状态的消息;

step6:Producer重试发送消息;

step7:重试次数超过3次,将消息状态更新为确认失败,后续根据具体业务再处理确认失败的消息;

二、消息记录的增删改查

1. 当前项目名为 rabbit-core-producer,为了实现消息记录入库,需要跟数据库打交道,这里首先添加依赖:

        <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jdbc</artifactId>
		</dependency>
	    <dependency>
	      <groupId>org.mybatis.spring.boot</groupId>
	      <artifactId>mybatis-spring-boot-starter</artifactId>
	      <version>1.1.1</version>
	    </dependency>
		<dependency>
		    <groupId>com.alibaba</groupId>
		    <artifactId>druid</artifactId>
		    <version>1.1.10</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
		</dependency> 

 2. 消息记录的建表语句 rabbit-producer-message-schema.sql

-- 表 broker_message.broker_message 结构
CREATE TABLE IF NOT EXISTS `broker_message` (
  `message_id` varchar(128) NOT NULL,
  `message` varchar(4000),
  `try_count` int(4) DEFAULT 0,
  `status` varchar(10) DEFAULT '',
  `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

将 rabbit-producer-message-schema.sql 放在 rabbit-core-producer 项目下的 /src/main/resources/rabbit-producer-message-schema.sql, rabbit-core-producer项目 在 RabbitMQ基础组件封装—整体结构 有具体说明(当前博客是 RabbitMQ基础组件封装—整体结构 的其中一个章节)。

3. 数据源的配置文件 rabbit-producer-message.properties

rabbit.producer.druid.type=com.alibaba.druid.pool.DruidDataSource
rabbit.producer.druid.jdbc.url=jdbc:mysql://localhost:3306/broker_message?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT
rabbit.producer.druid.jdbc.driver-class-name=com.mysql.jdbc.Driver
rabbit.producer.druid.jdbc.username=root
rabbit.producer.druid.jdbc.password=root
rabbit.producer.druid.jdbc.initialSize=5
rabbit.producer.druid.jdbc.minIdle=1
rabbit.producer.druid.jdbc.maxActive=100
rabbit.producer.druid.jdbc.maxWait=60000
rabbit.producer.druid.jdbc.timeBetweenEvictionRunsMillis=60000
rabbit.producer.druid.jdbc.minEvictableIdleTimeMillis=300000
rabbit.producer.druid.jdbc.validationQuery=SELECT 1 FROM DUAL
rabbit.producer.druid.jdbc.testWhileIdle=true
rabbit.producer.druid.jdbc.testOnBorrow=false
rabbit.producer.druid.jdbc.testOnReturn=false
rabbit.producer.druid.jdbc.poolPreparedStatements=true
rabbit.producer.druid.jdbc.maxPoolPreparedStatementPerConnectionSize= 20
rabbit.producer.druid.jdbc.filters=stat,wall,log4j
rabbit.producer.druid.jdbc.useGlobalDataSourceStat=true

同样需要将该文件放在 rabbit-core-producer 项目下的 /src/main/resources/rabbit-producer-message.properties。

4. BrokerMessage.java

public class BrokerMessage implements Serializable {
	
	private static final long serialVersionUID = 7447792462810110841L;

	private String messageId;

    private Message message;

    private Integer tryCount = 0;

    private String status;

    private Date nextRetry;

    private Date createTime;

    private Date updateTime;

    // getter、setter方法省略
}

5. BrokerMessageMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.didiok.rabbit.producer.mapper.BrokerMessageMapper" >
  <resultMap id="BaseResultMap" type="com.didiok.rabbit.producer.entity.BrokerMessage" >
    <id column="message_id" property="messageId" jdbcType="VARCHAR" />
    <result column="message" property="message" jdbcType="VARCHAR" typeHandler="com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler" />
    <result column="try_count" property="tryCount" jdbcType="INTEGER" />
    <result column="status" property="status" jdbcType="VARCHAR" />
    <result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" />
    <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
    <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
  </resultMap>
  <sql id="Base_Column_List" >
    message_id, message, try_count, status, next_retry, create_time, update_time
  </sql>
  <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
    select 
    <include refid="Base_Column_List" />
    from broker_message
    where message_id = #{messageId,jdbcType=VARCHAR}
  </select>
  <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
    delete from broker_message
    where message_id = #{messageId,jdbcType=VARCHAR}
  </delete>
  <insert id="insert" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
    insert into broker_message (message_id, message, try_count, 
      status, next_retry, create_time, 
      update_time)
    values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler}, #{tryCount,jdbcType=INTEGER}, 
      #{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP}, 
      #{updateTime,jdbcType=TIMESTAMP})
  </insert>
  <insert id="insertSelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
    insert into broker_message
    <trim prefix="(" suffix=")" suffixOverrides="," >
      <if test="messageId != null" >
        message_id,
      </if>
      <if test="message != null" >
        message,
      </if>
      <if test="tryCount != null" >
        try_count,
      </if>
      <if test="status != null" >
        status,
      </if>
      <if test="nextRetry != null" >
        next_retry,
      </if>
      <if test="createTime != null" >
        create_time,
      </if>
      <if test="updateTime != null" >
        update_time,
      </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides="," >
      <if test="messageId != null" >
        #{messageId,jdbcType=VARCHAR},
      </if>
      <if test="message != null" >
        #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
      </if>
      <if test="tryCount != null" >
        #{tryCount,jdbcType=INTEGER},
      </if>
      <if test="status != null" >
        #{status,jdbcType=VARCHAR},
      </if>
      <if test="nextRetry != null" >
        #{nextRetry,jdbcType=TIMESTAMP},
      </if>
      <if test="createTime != null" >
        #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="updateTime != null" >
        #{updateTime,jdbcType=TIMESTAMP},
      </if>
    </trim>
  </insert>
  <update id="updateByPrimaryKeySelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
    update broker_message
    <set >
      <if test="message != null" >
        message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
      </if>
      <if test="tryCount != null" >
        try_count = #{tryCount,jdbcType=INTEGER},
      </if>
      <if test="status != null" >
        status = #{status,jdbcType=VARCHAR},
      </if>
      <if test="nextRetry != null" >
        next_retry = #{nextRetry,jdbcType=TIMESTAMP},
      </if>
      <if test="createTime != null" >
        create_time = #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="updateTime != null" >
        update_time = #{updateTime,jdbcType=TIMESTAMP},
      </if>
    </set>
    where message_id = #{messageId,jdbcType=VARCHAR}
  </update>
  <update id="updateByPrimaryKey" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
    update broker_message
    set message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
      try_count = #{tryCount,jdbcType=INTEGER},
      status = #{status,jdbcType=VARCHAR},
      next_retry = #{nextRetry,jdbcType=TIMESTAMP},
      create_time = #{createTime,jdbcType=TIMESTAMP},
      update_time = #{updateTime,jdbcType=TIMESTAMP}
    where message_id = #{messageId,jdbcType=VARCHAR}
  </update>
  

  <update id="changeBrokerMessageStatus" >
    update broker_message bm
    set bm.status = #{brokerMessageStatus,jdbcType=VARCHAR},
      	bm.update_time = #{updateTime, jdbcType=TIMESTAMP}
    where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}  
  </update>
  
  
  <select id="queryBrokerMessageStatus4Timeout" resultMap="BaseResultMap" >
  	<![CDATA[
	    select message_id, message, try_count, status, next_retry, create_time, update_time
	    from broker_message bm
	    where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}  	
	    and bm.next_retry < sysdate()
    ]]>
  </select>
  
  <select id="queryBrokerMessageStatus" resultMap="BaseResultMap" >
	    select message_id, message, try_count, status, next_retry, create_time, update_time
	    from broker_message bm
	    where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}  	
  </select>
  
  
   <update id="update4TryCount" >
    update broker_message bm
    set bm.try_count = bm.try_count + 1,
      bm.update_time = #{updateTime,jdbcType=TIMESTAMP}
    where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
   </update>
   
   
</mapper>

6. BrokerMessageMapper.java

@Mapper
public interface BrokerMessageMapper {
	
    int deleteByPrimaryKey(String messageId);
    
    int insert(BrokerMessage record);

    int insertSelective(BrokerMessage record);

    BrokerMessage selectByPrimaryKey(String messageId);

    int updateByPrimaryKeySelective(BrokerMessage record);

    int updateByPrimaryKeyWithBLOBs(BrokerMessage record);

    int updateByPrimaryKey(BrokerMessage record);
	
	void changeBrokerMessageStatus(@Param("brokerMessageId")String brokerMessageId, @Param("brokerMessageStatus")String brokerMessageStatus, @Param("updateTime")Date updateTime);

	List<BrokerMessage> queryBrokerMessageStatus4Timeout(@Param("brokerMessageStatus")String brokerMessageStatus);
	
	List<BrokerMessage> queryBrokerMessageStatus(@Param("brokerMessageStatus")String brokerMessageStatus);
	
	int update4TryCount(@Param("brokerMessageId")String brokerMessageId, @Param("updateTime")Date updateTime);
	
}

7. MessageStoreService.java(这里不加接口类了,直接在MessageStoreService.java中写具体逻辑实现)

@Service
public class MessageStoreService {

	@Autowired
	private BrokerMessageMapper brokerMessageMapper;
	
	public int insert(BrokerMessage brokerMessage) {
		return this.brokerMessageMapper.insert(brokerMessage);
	}
	
	public BrokerMessage selectByMessageId(String messageId) {
		return this.brokerMessageMapper.selectByPrimaryKey(messageId);
	}

	public void succuess(String messageId) {
		this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
				BrokerMessageStatus.SEND_OK.getCode(),
				new Date());
	}
	
	public void failure(String messageId) {
		this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
				BrokerMessageStatus.SEND_FAIL.getCode(),
				new Date());
	}
	
	public List<BrokerMessage> fetchTimeOutMessage4Retry(BrokerMessageStatus brokerMessageStatus){
		return this.brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatus.getCode());
	}
	
	public int updateTryCount(String brokerMessageId) {
		return this.brokerMessageMapper.update4TryCount(brokerMessageId, new Date());
	}
}

三、整合数据源

1. 读取配置文件,生成数据源,RabbitProducerDataSourceConfiguration.java

@Configuration
@PropertySource({"classpath:rabbit-producer-message.properties"})
public class RabbitProducerDataSourceConfiguration {
	
	private static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RabbitProducerDataSourceConfiguration.class);
	
	@Value("${rabbit.producer.druid.type}")
	private Class<? extends DataSource> dataSourceType;
	
	@Bean(name = "rabbitProducerDataSource")
	@Primary
	// 以这个rabbit.producer.druid.jdbc为前缀的属性值都会注入到DataSource中
	@ConfigurationProperties(prefix = "rabbit.producer.druid.jdbc")
	public DataSource rabbitProducerDataSource() throws SQLException {
		DataSource rabbitProducerDataSource = DataSourceBuilder.create().type(dataSourceType).build();
		LOGGER.info("============= rabbitProducerDataSource : {} ================", rabbitProducerDataSource);
		return rabbitProducerDataSource;
	}
	
    public DataSourceProperties primaryDataSourceProperties(){
        return new DataSourceProperties();
    }
    
    public DataSource primaryDataSource(){
        return primaryDataSourceProperties().initializeDataSourceBuilder().build();
    }
	
}

2. 执行指定的sql脚本 ,BrokerMessageConfiguration.java

/**
 * 	$BrokerMessageConfiguration 
 * 	帮我执行SQL脚本
 * 	帮我进行数据库表结构的创建
 *
 */
@Configuration
public class BrokerMessageConfiguration {

    @Autowired
    private DataSource rabbitProducerDataSource;

    /**
     * 加载 rabbit-producer-message-schema.sql 脚本(这是一个建表语句)
     */
    @Value("classpath:rabbit-producer-message-schema.sql")
    private Resource schemaScript;
    
    @Bean
    public DataSourceInitializer initDataSourceInitializer() {
    	System.err.println("--------------rabbitProducerDataSource-----------:" + rabbitProducerDataSource);
        final DataSourceInitializer initializer = new DataSourceInitializer();
        // 设置之前生成的数据源
        initializer.setDataSource(rabbitProducerDataSource);
        // 执行指定的sql脚本
        initializer.setDatabasePopulator(databasePopulator());
        return initializer;
    }

    /**
     * 执行指定的sql脚本
     * @return
     */
    private DatabasePopulator databasePopulator() {
        final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(schemaScript);
        return populator;
    }
}

3. 接下来是和 Mybatis 配置相关的文件:RabbitProducerMyBatisConfiguration.java

@Configuration
// @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration执行完才能执行,即数据源生成之后才能执行当前类
@AutoConfigureAfter(value = {RabbitProducerDataSourceConfiguration.class})
public class RabbitProducerMyBatisConfiguration {

	@Resource(name= "rabbitProducerDataSource")
	private DataSource rabbitProducerDataSource;
	
	@Bean(name="rabbitProducerSqlSessionFactory")
	public SqlSessionFactory rabbitProducerSqlSessionFactory(DataSource rabbitProducerDataSource) {
		
		SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
		bean.setDataSource(rabbitProducerDataSource);
		ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
		try {
			// mapper.xml文件加载,这些配置本可以写在 application.yml 中,但是由于要作为一个基础组件,所以写在代码里,跟业务层面解绑,让业务层面无感知
			bean.setMapperLocations(resolver.getResources("classpath:com/didiok/rabbit/producer/mapping/*.xml"));
			SqlSessionFactory sqlSessionFactory = bean.getObject();
			sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);
			return sqlSessionFactory;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
	
	@Bean(name="rabbitProducerSqlSessionTemplate")
	public SqlSessionTemplate rabbitProducerSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
		return new SqlSessionTemplate(sqlSessionFactory);
	}
	
}

4. Mapper扫描配置相关的文件: RabbitProducerMybatisMapperScanerConfig.java

@Configuration
// @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration执行完才能执行,即数据源生成之后才能执行当前类
@AutoConfigureAfter(RabbitProducerDataSourceConfiguration.class)
public class RabbitProducerMybatisMapperScanerConfig {
	
	@Bean(name="rabbitProducerMapperScannerConfigurer")
    public MapperScannerConfigurer rabbitProducerMapperScannerConfigurer() {
        // mapper.java文件加载,这些配置本可以写在 application.yml 中,但是由于要作为一个基础组件,所以写在代码里,跟业务层面解绑,让业务层面无感知
        MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
        mapperScannerConfigurer.setSqlSessionFactoryBeanName("rabbitProducerSqlSessionFactory");
        mapperScannerConfigurer.setBasePackage("com.didiok.rabbit.producer.mapper");
        return mapperScannerConfigurer;
    }

}

四、可靠性发送消息代码实现

/**
 * 	$RabbitBrokerImpl 真正的发送不同类型的消息实现类
 *
 */
@Slf4j
@Component
public class RabbitBrokerImpl implements RabbitBroker {

	@Autowired
	private RabbitTemplateContainer rabbitTemplateContainer;
	
	@Autowired
	private MessageStoreService messageStoreService;
	
	/**
	 * 可靠性消息发送
	 */
	@Override
	public void reliantSend(Message message) {
		message.setMessageType(MessageType.RELIANT);
		BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId());
		if(bm == null) {
			//1. 把数据库的消息发送日志先记录好
			Date now = new Date();
			BrokerMessage brokerMessage = new BrokerMessage();
			brokerMessage.setMessageId(message.getMessageId());
			brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode());
			//tryCount默认等于0 所以在最开始发送的时候不需要进行设置
			brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
			brokerMessage.setCreateTime(now);
			brokerMessage.setUpdateTime(now);
			brokerMessage.setMessage(message);
			messageStoreService.insert(brokerMessage);			
		}
		//2. 执行真正的发送消息逻辑
		sendKernel(message);
	}

	@Override
	public void rapidSend(Message message) {
		// 省略...
	}
	
	/**
	 * 	$sendKernel 发送消息的核心方法 使用异步线程池进行发送消息
	 * @param message
	 */
	private void sendKernel(Message message) {
		AsyncBaseQueue.submit((Runnable) () -> {
			CorrelationData correlationData =
					// 回调函数confirm中需要用到message.getMessageId(), message.getMessageType()。所以可以放在CorrelationData中
					new CorrelationData(String.format("%s#%s#%s",
							message.getMessageId(),
							System.currentTimeMillis(),
							message.getMessageType()));
			String topic = message.getTopic();
			String routingKey = message.getRoutingKey();
			RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
			rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
			log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageId: {}", message.getMessageId());			
		});
	}

	@Override
	public void confirmSend(Message message) {
		// 省略...
	}

	@Override
	public void sendMessages() {
		// 省略...
	}

}

并且在回调函数中,也要添加相应的逻辑:

/**
 * 	$RabbitTemplateContainer池化封装
 * 	每一个topic 对应一个RabbitTemplate
 *	1.	提高发送的效率
 * 	2. 	可以根据不同的需求制定化不同的RabbitTemplate, 比如每一个topic 都有自己的routingKey规则
 */
@Slf4j
@Component
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {

	private Map<String /* TOPIC */, RabbitTemplate> rabbitMap = Maps.newConcurrentMap();
	
	private Splitter splitter = Splitter.on("#");
	
	private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;
	
	@Autowired
	private ConnectionFactory connectionFactory;
	
	@Autowired
	private MessageStoreService messageStoreService;
	
	public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException {
		// 省略...
	}

	/**
	 * 	无论是 confirm 消息 还是 reliant 消息 ,发送消息以后 broker都会去回调confirm
	 */
	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		// 	具体的消息应答
		List<String> strings = splitter.splitToList(correlationData.getId());
		String messageId = strings.get(0);
		long sendTime = Long.parseLong(strings.get(1));
		String messageType = strings.get(2);
		if(ack) {
			//	当Broker 返回ACK成功时, 就是更新一下日志表里对应的消息发送状态为 SEND_OK
			
			// 	如果当前消息类型为reliant 我们就去数据库查找并进行更新
			if(MessageType.RELIANT.endsWith(messageType)) {
				this.messageStoreService.succuess(messageId);
			}
			log.info("send message is OK, confirm messageId: {}, sendTime: {}", messageId, sendTime);
		} else {
			log.error("send message is Fail, confirm messageId: {}, sendTime: {}", messageId, sendTime);
			
		}
	}
}

上面大部分代码都是在实现迅速类型的消息发送时已经编写了,只是在 confirm()方法中添加了:

            // 	如果当前消息类型为reliant 我们就去数据库查找并进行更新
			if(MessageType.RELIANT.endsWith(messageType)) {
				this.messageStoreService.succuess(messageId);
			}

五、定时任务获取长时间处于待确认状态的消息并重新发送

1. 实现分布式定时任务

这里的定时任务是使用 ElasticJob,并对其进行封装,封装之后的项目为 rabbit-task,封装成为了两个注解 @EnableElasticJob 和 @ElasticJobConfig 。

具体的 ElasticJob 的使用和封装过程可参考教程:ElasticJob使用与封装

2.  将封装好的项目 rabbit-task 添加到 当前项目中并使用

(1)引入 rabbit-task 的依赖

        <dependency>
    		<groupId>com.bfxy.base.rabbit</groupId>
    		<artifactId>rabbit-task</artifactId> 
    		<version>0.0.1-SNAPSHOT</version>			
  		</dependency>

(2)使用注解@EnableElasticJob

在当前项目 rabbit-core-producer 中的 自动装配类 中添加注解 @EnableElasticJob,使得当 应用程序启动的时候,就能对 ZooKeeper注册中心进行初始化,以及 ElasticJob的定时任务解析类 ElasticJobConfParser 的初始化。

/**
 * 	$RabbitProducerAutoConfiguration 自动装配 
 *
 */
@EnableElasticJob
@Configuration
@ComponentScan({"com.didiok.rabbit.producer.*"})
public class RabbitProducerAutoConfiguration {


}

(3)实现定时任务的具体处理逻辑并在类上加注解@EnableElasticJob

这里为了消息的可靠性发送,我们需要抓取 超时却仍处于待确认状态 的消息,进行重新发送消息。这里使用 ElasticJob 的流式定时任务 DataFlowJob。

@Component
@ElasticJobConfig(
		name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob",
		cron= "0/10 * * * * ?",
		description = "可靠性投递消息补偿任务",
		overwrite = true,
		shardingTotalCount = 1
		)
@Slf4j
public class RetryMessageDataflowJob implements DataflowJob<BrokerMessage>{

	@Autowired
	private MessageStoreService messageStoreService;
	
	@Autowired
	private RabbitBroker rabbitBroker;
	
	private static final int MAX_RETRY_COUNT = 3;
	
	@Override
	public List<BrokerMessage> fetchData(ShardingContext shardingContext) {
		// 抓取状态为未确认,而且 next_retry 小于当前时间的这些消息,为了确定百分百能发送成功,需要再进行重发
		List<BrokerMessage> list = messageStoreService.fetchTimeOutMessage4Retry(BrokerMessageStatus.SENDING);
		log.info("--------@@@@@ 抓取数据集合, 数量:	{} 	@@@@@@-----------" , list.size());
		return list;
	}

	@Override
	public void processData(ShardingContext shardingContext, List<BrokerMessage> dataList) {
		
		dataList.forEach( brokerMessage -> {
			
			String messageId = brokerMessage.getMessageId();
			if(brokerMessage.getTryCount() >= MAX_RETRY_COUNT) {
				// 重试次数大于3,就不再进行重发了,直接认为发送失败,更改标记为失败
				this.messageStoreService.failure(messageId);
				log.warn(" -----消息设置为最终失败,消息ID: {} -------", messageId);
			} else {
				//	每次重发的时候要更新一下try_count和next_retry字段
				this.messageStoreService.updateTryCount(messageId);
				// 	重发消息
				this.rabbitBroker.reliantSend(brokerMessage.getMessage());
			}
			
		});
	}
}

上面的代码中加入了注解

@ElasticJobConfig(
      name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob",
      cron= "0/10 * * * * ?",
      description = "可靠性投递消息补偿任务",
      overwrite = true,
      shardingTotalCount = 1
      )

则该类中的逻辑会定时执行。

对于重发消息的代码 this.rabbitBroker.reliantSend(brokerMessage.getMessage());,之前已经做过说明,这里不再赘述。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/495946.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Formik使用详解

Formik使用详解 1 引言 在现代Web应用程序中&#xff0c;表单是一种不可避免的输入机制&#xff0c;但是处理表单的过程可能会变得非常复杂。Formik是一个React表单库&#xff0c;它的目标是简化表单处理的过程。本文将介绍Formik的主要功能和用途&#xff0c;以及如何使用它来…

怎么成为一名架构师?架构师第一步。基层开发人员逆袭成为架构师真的很难吗?

文章目录 写在前面一、企业需要什么样的架构师1、从招聘软件上了解2、架构师的主要职责与能力 二、成为一名架构师很难吗1、架构师的定义2、当前大部分开发团队的现状3、为什么要有架构师4、技术人员如何自我突破 三、晨钟暮鼓的几句话 写在前面 一个团队中&#xff0c;每个人…

JAVA基础:Scanner类中next(), nextLine(), hasNext(), hasNextLine()

一、next() : 只读缓冲区中空格之前的数据,并且光标指向本行。二、nextLine() : 读取除回车以外的所有符号(整行内容)&#xff0c;光标定位在下一行三、hasNext() &#xff1a;检查下一个标记&#xff08;token&#xff09;&#xff0c;也就是以空格、制表符或换行符为分隔符的…

[JAVA EE]创建Servlet——实现Servlet接口笔记1

创建Servlet的方式之一&#xff1a;实现servlet接口 servlet的生命周期&#xff1a; 1、实例化&#xff1a;创建servlet实例对象 2、初始化&#xff1a;调用init方法完成初始化工作 3、服务&#xff1a;调用service方法来处理用户请求 4、销毁&#xff1a;调用destroy方法…

Java算法比赛常用方法

1. 开方&#xff1a;Math.sqrt(x); 2. x的a方&#xff1a;Math.pow(x,a); 3. 绝对值&#xff1a;Math.abs(x)&#xff1b; 4. BigInteger&#xff1a;大数&#xff08;加&#xff0c;减&#xff0c;乘&#xff0c;除&#xff0c;取余&#xff09; c.add(d) ; c.subtract(d)…

如何用100天彻底学会Python?

Python 是一门功能强大、易于学习且历史悠久的编程语言。如果你希望在短时间内彻底学会 Python&#xff0c;需要制定一个全面的学习计划&#xff0c;并进行刻意的练习和实践。 以下是一份建议的学习计划&#xff0c;帮助你在 100 天内掌握 Python 技能。 第 1-10 天&#xff…

从bootamition出发分析OpenHarmony下Gralloc buffer管理机制

从bootamition出发分析OpenHarmony下Gralloc buffer管理机制 引言 这个文档主要记录从bootamition角度出发&#xff0c;分析OpenHarmony下对gralloc buffer的管理&#xff01;由于OpenHarmony图形子系统过于复杂&#xff0c;且个人由于能力有限&#xff0c;这里我仅从gralloc b…

2023.03青少年机器人技术等级考试理论综合试卷(五级)

2023年3月青少年机器人技术等级考试理论综合试卷&#xff08;五级&#xff09; 一、单选题(共 20 题&#xff0c;共 80 分) 1. 0x35 & 7 的结果是&#xff1f;&#xff08;A &#xff09; A. 5 B. 55 C. 50 D. 54 2.一般状况下&#xff0c;关于主控板的工作电压&#xff0c…

「 Redis 」大key对持久化有什么影响?

「 Redis 」大key对持久化有什么影响&#xff1f; 参考&鸣谢 Redis 大 Key 对持久化有什么影响&#xff1f; XiaoLinCoding 解决了Redis大key问题&#xff0c;同事们都夸他牛皮 大白斯基 快手面试官&#xff1a;Redis变慢了&#xff0c;如何快速排查&#xff1f; Java 那些…

Vue 2.0 学习笔记

Vue学习笔记 文章目录 Vue学习笔记[toc]一、数据代理实现二、事件相关1.事件修饰符2.键盘事件 三、计算属性与监视1.计算属性-computed2.监视-watch 四、条件渲染1.v-show2.v-if&#xff0c;v-else-if 五、循环遍历1.v-for语法2.key的作用与原理 六、内置指令1.v-cloak指令&…

使用GitHub分享项目

一、注册账户 访问GitHub网站&#xff0c;点击“Sign up”按钮开始注册账号。然后按照提示输入你的用户名、电子邮箱地址和密码&#xff0c;提交成功后通过邮箱或你注册的手机号码进行验证身份。 二、上传项目 想分享自己的项目&#xff0c;首先需要在GitHub上创建一个新的仓库…

React + ts学习笔记

前提准备&#xff1a; 环境配置 安装node.js 官网安装&#xff1a;当前使用版本18.15.0 安装新的react应用&#xff1a; 运行命令新建react-app npx create-react-app study-ts-app当前版本&#xff1a; “react”: “^18.2.0”,“react-dom”: “^18.2.0”, 如果出现如…

优维低代码实践:第一个微应用

优维低代码技术专栏&#xff0c;是一个全新的、技术为主的专栏&#xff0c;由优维技术委员会成员执笔&#xff0c;基于优维7年低代码技术研发及运维成果&#xff0c;主要介绍低代码相关的技术原理及架构逻辑&#xff0c;目的是给广大运维人提供一个技术交流与学习的平台。 优维…

vue-element-admin踩坑合集+完整包(项目源码 +依赖)

目录 Nodejs版本&#xff1a; 安装依赖时遇到的报错&#xff1a; 启动报错&#xff1a; vue-element-admin完整包地址&#xff1a; 在部署安装使用vue-element-admin开源项目的时候&#xff0c;会遇到各种各样的问题。 这里是本人遇到的一些坑。。。。。。 Nodejs版本&am…

【技术碎片】【Java】计算椭圆的外接矩形坐标

目录 前言原生实现&#xff08;错误方法&#xff09;精确实现&#xff08;数学解&#xff09;参考 前言 遇到一个需要计算一般椭圆&#xff08;斜椭圆&#xff09;的外接矩形坐标的问题&#xff0c;在此记录一下 已知椭圆的中心点坐标centerX centerY&#xff0c;椭圆的长轴&…

FPGA - 7系列 FPGA内部结构之CLB -02- CLB功能详解

前言 本文翻译自UG474第二章&#xff0c;主要对7系列FPGAs CLB结构进行详细介绍。这些细节对设计优化和验证很有帮助。 CLB 排列 CLB 在 7 系列 FPGA 中按列排列。 7 系列是基于 ASMBL架构提供的独特柱状方法的第四代产品。ASMBL 架构 Xilinx 创建了高级硅模块块 (ASMBL) 架…

【hello Linux】线程互斥

目录 1. 互斥量mutex 2. 互斥量的接口 2.1 初始化互斥量 2.2 销毁互斥量 2.3 互斥量加锁和解锁 2.4 互斥量实现原理探究 3. 可重入VS线程安全 4. 常见锁概念 5. 多线程抢票系统 Linux&#x1f337; 在介绍线程互斥前&#xff0c;我们先来看几个专业性术语&#xff1a; 【临界资…

边缘计算节点是啥?边缘计算与CDN有什么关系?一文带你了解边缘计算节点BEC

边缘计算节点是基于CDN边缘节点构建&#xff0c;覆盖全国大部分地区&#xff0c;三大运营商全覆盖。将算力下沉到各城市级节点&#xff0c;提供离用户更近的算力资源。 那么可能有些小伙伴会问&#xff0c;CDN也是就近为用户提供服务&#xff0c;边缘计算节点和CDN有什么不同呢…

时序数据利用EEMD_LSTM模型进行预测(Python编程,数据集和代码均在压缩包,解压缩后可以直接运行,数据可以替换为股票数据,交通流量等时序数据)

运行效果(为减少录屏时间&#xff0c;视频中epoch设置为30&#xff0c;改为100效果更佳):利用EEMD_LSTM模型对时序数据进行预测&#xff08;视频中epoch为30&#xff0c;当为100 的时候效果更佳&#xff09;_哔哩哔哩_bilibili 1.数据介绍&#xff1a;以每天为间隔的时序数据 …

达梦:dts工具迁移mysql decimal(65,30)的字段,报精度超出定义

本文旨在分享迁移MySQL decimal字段​​​​​​​时遇到“精度超出定义”问题时&#xff0c;如何理解MySQL和达梦对于decimal 等这一类数值数据类型。 1.了解达梦的数值数据类型定义 ​​​​​​​​​​​​​​NUMERIC 类型 语法&#xff1a;NUMERIC[(精度 [, 标度])]功…