本地消息表方案核心思路
需要分布式处理的任务通过消息日志的方式来异步执行。消息日志可以存储到本地文本、数据库或消息队列,再通过业务规则自动或人工发起重试。
方案核心具体实现
包括数据模型和核心逻辑
业务规则
定义业务的消息配置如topic 、key、tag
定义业务的重试策略(最大重试次数、报警方式 短信 钉钉)
数据模型
消息 TransactionMessage
消息内存队列 LinkedBlockedQueue
重试消息内存队列 RetryMessageBlockedQueue
持久层 transaction_message
核心逻辑
本地事务
使用TransactionTemplate 保证业务业务操作和TransactionMessage一并提交事务。
并且分派一个任务给消息消费工人。
注解
对外暴露注解,使用AOP+注解调用本地事务。
消息消费工人 MeesageWorker
1. 负责将本地事务生成的消息保存到消息内存队列。
2. 消费者线程读取消息并发送给Broker。
补偿消息消费工人 RetryMessageWorker
1. 将消费失败的消息保存到重试消费内存队列。每300毫秒重试一次。
2. 高可靠操作。应用启动时将事务消息表未发送成功的消息,再次发送。
重试策略
每个业务自行决定最大重试次数。若达到最大重试次数是报警还是放弃。
TransactionMessage
/**
* 事务消息
* @author jiguansheng
* @date 2023/7/15
**/
@Getter
@Setter
public class TransactionMessage {
/**
* 业务编码
*/
private String businessCode;
/**
* 本地消息事务id
*/
private String transactionId;
/**
* 消息内容
*/
private String body;
/**
* 0 初始化 1 已发送 2发送失败 3 抛弃
*/
private Integer sendStatus;
/**
* 重试次数
*/
private Integer retriesTime;
/**
* 创建时间
*/
private LocalDateTime createTime;
}