f# 1.Docker安装RabbitMQ
docker run -d --name rabbitmq \
-p 5672:5672 -p 25672:25672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management
- 5672:AMQP端口
- 25672:集群端口
- 15672:web管理后台端口
2.搭建订单页面环境
2.1.域名映射及网关配置
192.168.139.10 order.gmall.com
网关配置配置
spring:
cloud:
gateway:
route:
- id: gmall_order_route
uri: lb://gmall-order
predicates:
- Host=order.gmall.com
2.2.Nginx动静分离
2.2.1.静态资源
2.2.2.模板页面
- confirm.html:订单确认页面
- list.html:订单列表页面
- detail.html:订单详情页面
- pay.html:支付页面
3.订单
3.1.订单中心
电商系统涉及到3流,信息流,资金流,物流,而订单系统作为中枢,将三者有机的集合起来。在订单中心当中,以信息、资金、物流为基础,贯穿整个订单中心全过程。
- 信息代表订单内由各个系统组合而成的交易信息,同时由完整的交易链路而形成,从商品的交易、支付、再到结算。
- 资金是指用户由交易而形成的资金流动,通过账户余额、或其他支付渠道,形成未结算/已结算资金
- 物流代表形成订单后由商家对用户形成的实体服务,同时结合渠道、类型、服务、信息,进而构成完整的订单体系。
订单模块是电商系统的枢纽,在订单这个环节上需要获取多个模块的数据和信息,同时对这些信息进行加工处理后流向下个环节,这一系列就构成了订单的信息流通。
3.1.1.订单构成
订单中心整体框架图:
3.2.2.订单状态
1. 待付款
用户提交订单后,订单进行预下单,目前主流电商网站都会唤起支付,便于用户快速完成支付,需要注意的是待付款状态下可以对库存进行锁定,锁定库存需要配置支付超时时间,超时后将自动取消订单,订单变更关闭状态。
2. 已付款/待发货
用户完成订单支付,订单系统需要记录支付时间,支付流水单号便于对账,订单下放到WMS系 统,仓库进行调拨,配货,分拣,出库等操作。
3. 待收货/已发货
仓储将商品出库后,订单进入物流环节,订单系统需要同步物流信息,便于用户实时知悉物品物流状态。
4. 已完成
用户确认收货后,订单交易完成。如果订单存在问题则进入到售后状态。
5. 已取消
付款之前取消订单,包括超时未付款或用户商户取消订单都会产生这种订单状态。
6. 售后中
用户在付款后申请退款,或商家发货后用户申请退换货。
售后也同样存在各种状态,当发起售后申请后生成售后订单,售后订单状态为待审核,等待商家审核,商家审核通过后订单状态变更为待退货,等待用户将商品寄回,商家收获后订单
3.2.订单流程
订单流程是指从订单产生到完成整个流转的过程,从而形成了一套标准流程规则。订单包括正向流程和逆向流程,对应场景就是购买商品和退换货流程,正向流程就是一个正常的网购步骤:订单生成->支付订单->卖家发货->确认收货->交易成功。
3.2.1.订单创建与支付
- 订单创建前需要预览订单,选择收货信息等
- 订单创建需要锁定库存,库存有才可创建,否则不能创建
- 订单创建后超时未支付需要解锁库存
- 支付成功后,需要进行拆单,根据商品打包方式,所在仓库,物流等进行拆单
- 支付的每笔流水都需要登记,以待查账
- 订单创建,支付成功等状态都需要给MQ发送消息,方便其他系统感知订阅
3.2.2.逆向流程
- 修改订单,用户没有提交订单,可以对订单一些信息进行修改,比如配送信息,优惠信息,及其他
一些订单可修改范围的内容,此时只需要对数据进行变更即可。 - 订单取消,用户主动取消订单和用户超时未支付,两种情况下都会取消订单,而超时情况是系统自动关闭订单,所以在订单支付和响应机制上面要做支付的限时处理,尤其是在下单减库存的情形下,可以保证快速的释放库存。另外需要处理的是促销优惠中使用的优惠券,权益等视平台规则,进行相应补回给用户。
- 退款,在待发货订单状态下取消订单时,分为缺货退款和用户申请退款。如果是全部退款则订单更新为关闭状态,若只是做部分退款则订单任需进行,同时生成一条退款的售后订单,走退款流程。退款金额需原路返回用户的账户。
- 发货后的退款,发生在仓储货物配送,在配送过程中商品遗失,用户拒收,用户收获后对商品不满意,这样情况下用户发起退款的售后诉求后,需要商户进行退款的审核,双发达成一致后,系统更新退款状态,对订单进行退款操作,金额原路返回用户账户,同时关闭原订单数据。仅退款情况下暂不考虑仓库系统变化。如果发生双方协商不一致情况下,可以申请平台客服介入。在退款订单商户不处理的情况下,系统需要做限期判断,比如5天商户不处理,退款单自动变更同意退款。
3.3.订单确认页实现
OrderWebController
/**
* 订单确认页
* @param model
* @return
*/
@GetMapping("/toTrade")
public String toTrade(Model model, HttpServletRequest request){
try {
OrderConfirmVO orderConfirmVO = orderService.confirmOrder();
model.addAttribute("orderConfirmData", orderConfirmVO);
} catch (Exception e) {
e.printStackTrace();
}
return "confirm";
}
OrderServiceImpl
/**
* 返回订单确认页需要的数据
* @return
* @throws Exception
*/
@Override
public OrderConfirmVO confirmOrder() throws Exception {
OrderConfirmVO confirmVO = new OrderConfirmVO();
MemberVO memberVO = LoginInterceptor.threadLocal.get();
//获取之前的请求
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
CompletableFuture<Void> getAddressFuture = CompletableFuture.runAsync(() -> {
//每一个线程都来共享之前的请求数据
RequestContextHolder.setRequestAttributes(requestAttributes);
//远程查询会员的所有收获地址
List<MemberAddressVO> address = memberFeignService.getAddress(memberVO.getId());
confirmVO.setAddress(address);
}, executor);
CompletableFuture<Void> getItemsFuture = CompletableFuture.runAsync(() -> {
//每一个线程都来共享之前的请求数据
RequestContextHolder.setRequestAttributes(requestAttributes);
//远程查询会员的所有购物项
//feign在远程调用之前要构造请求,调用很多拦截器(RequestInterceptor ... )
List<OrderItemVO> items = cartFeignService.getCartItems();
confirmVO.setItems(items);
}, executor).thenRunAsync(() -> {
List<Long> skuIds = confirmVO.getItems().stream().map(item ->
item.getSkuId()).collect(Collectors.toList());
//远程查询商品库存
List<SkuHasStockTO> skuHasStock = wareFeignService.getSkuHasStock(skuIds);
if (skuHasStock != null) {
Map<Long, Boolean> map = skuHasStock.stream()
.collect(Collectors.toMap(SkuHasStockTO::getSkuId,SkuHasStockTO::getHasStock));
confirmVO.setStocks(map);
}
}, executor);
//用户积分
Integer integration = memberVO.getIntegration();
confirmVO.setIntegration(integration);
CompletableFuture.allOf(getAddressFuture, getItemsFuture).get();
return confirmVO;
}
3.4.Feign远程调用丢失请求头问题
3.4.1.Feign远程同步调用丢失请求头问题
Feign远程同步调用
@Override
public OrderConfirmVO confirmOrder() throws Exception {
OrderConfirmVO confirmVO = new OrderConfirmVO();
MemberVO memberVO = LoginInterceptor.threadLocal.get();
//远程查询会员的所有收获地址
List<MemberAddressVO> address = memberFeignService.getAddress(memberVO.getId());
confirmVO.setAddress(address);
//远程查询会员的所有购物项
//feign在远程调用之前要构造请求,调用很多拦截器(RequestInterceptor ... )
List<OrderItemVO> items = cartFeignService.getCartItems();
confirmVO.setItems(items);
//用户积分
Integer integration = memberVO.getIntegration();
confirmVO.setIntegration(integration);
CompletableFuture.allOf(getAddressFuture, getItemsFuture).get();
return confirmVO;
}
Feign 请求拦截器配置
package com.atguigu.gmall.order.config;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Confĕ guration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
/**
* Feign 请求拦截器配置 {@link FeignConfig}
*
* @author zhangwen
* @email: 1466787185@qq.com
*/
@Configuration
public class FeignConfig {
@Bean
public RequestInterceptor requestInterceptor(){
return new RequestInterceptor() {
@Override
public void apply(RequestTemplate template) {
ServletRequestAttributes attributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
String cookie = request.getHeader("Cookie");
//新请求需要同步原请求头数据,Cookie
template.header("Cookie", cookie);
}
};
}
}
3.4.2.Feign远程异步调用丢失请求头问题
Feign远程异步调用
@Override
public OrderConfirmVO confirmOrder() throws Exception {
OrderConfirmVO confirmVO = new OrderConfirmVO();
MemberVO memberVO = LoginInterceptor.threadLocal.get();
//获取之前的请求
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
CompletableFuture<Void> getAddressFuture = CompletableFuture.runAsync(() -> {
//每一个线程都来共享之前的请求数据
RequestContextHolder.setRequestAttributes(requestAttributes);
//远程查询会员的所有收获地址
List<MemberAddressVO> address = memberFeignService.getAddress(memberVO.getId());
confirmVO.setAddress(address);
}, executor);
CompletableFuture<Void> getItemsFuture = CompletableFuture.runAsync(() -> {
//每一个线程都来共享之前的请求数据
RequestContextHolder.setRequestAttributes(requestAttributes);
//远程查询会员的所有购物项
//feign在远程调用之前要构造请求,调用很多拦截器(RequestInterceptor ... )
List<OrderItemVO> items = cartFeignService.getCartItems();
confirmVO.setItems(items);
}, executor);
//用户积分
Integer integration = memberVO.getIntegration();
confirmVO.setIntegration(integration);
CompletableFuture.allOf(getAddressFuture, getItemsFuture).get();
return confirmVO;
}
Feign 请求拦截器配置
package com.atguigu.gmall.order.config;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Confĕ guration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
/**
* Feign 请求拦截器配置 {@link FeignConfig}
*
* @author zhangwen
* @email: 1466787185@qq.com
*/
@Configuration
public class FeignConfig {
@Bean
public RequestInterceptor requestInterceptor(){
return new RequestInterceptor() {
@Override
public void apply(RequestTemplate template) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes != null) {
HttpServletRequest request = attributes.getRequest();
String cookie = request.getHeader("Cookie");
// 新请求需要同步原请求头数据,Cookie
template.header("Cookie", cookie);
}
}
};
}
}
4.幂等性处理
4.1.什么是幂等性
接口幂等性就是用户对于同一操作发起的一起请求或多次请求的结果是一致的,不会因为多次点击而产生副作用;
比如说支付场景,用户购买了商品,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击支付,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条,这就没有保证接口的幂等性。
4.2.哪些情况需要防止
- 用户多次点击按钮
- 用户页面回退再次提交
- 微服务相互调用,由于网络原因,导致请求失败,feign触发重试机制
- 其他业务情况…
4.3.什么情况下需要幂等
以SQL为例,有些操作是天然幂等的。
SELECT * FROM user WHERE id = 1
无论执行多少次都不会改变状态,是天然幂等。
UPDATE user SET unick = ‘atguigu’ WHERE uid = 1
多次操作,结果一样,具有幂等性。
INSERT INTO user (uid, uname) VALUES (1, ‘atguigu’)
如uid为唯一主键,即重复操作上面的业务,只会插入1条用户数据,具备幂等性。
UPDATE user SET age = age + 1 WHERE uid = 1
叠加更新每次执行的结果都会发生变化,不是幂等的。
INSERT INTO user (uid, uname) VALUES (1, ‘atguigu’)
如uid不是主键,可以重复,那上面的业务多次操作,数据会新增多条,不具备幂等。
4.4.幂等解决方案
4.4.1.Token机制
原理:
- 服务端提供了发送token的接口。我们在分析业务的时候,哪些业务是存在幂等问题的,就必须在执行业务之前,先去获取token,服务器会把token保存到redis中。
- 然后调用业务接口请求时,把token携带过去,一般放在请求头中。
- 服务器判断token是否存在redis中,存在表示第一次请求,然后删除token,继续执行业务。
- 如果判断token不存在redis中,就表示是重复操作,直接返回重复标记给client,注意就保证了业务代码不被重复执行。
危险性:
-
先删除token,还是后删除token
- 先删除可能导致,业务确实没有执行,重试还带上之前token,由于防重设计导致请求还是不能执行
- 后删除可能导致,业务处理成功,但是服务闪断,出现超时,没有删除token,别人继续重试,导致业务被执行两遍
-
token获取,比较和删除必须是原子性
- 可以在redis使用lua脚本完成这个操作
if redis.call('get',KEYS[1]) == ARGV[1]
then
return redis.call('del',KEYS[1])
else
return 0
end
4.4.2.各种锁机制
1)数据库悲观锁
SELECT * FROM TABLE WHERE id = 1 FOR UPDATE
悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,需要根据实际情况选用。另外要注意的是,id字段一定是注解或者唯一索引,不然可能造成锁表的结果,处理起来会非常麻烦。
2)数据库乐观锁
这张方法适合在更新的场景中,处理读多写少的问题
UPDATE t_goods SET count = count-1, version = version+1 WHERE good_id = 1 AND version = 1
根据version版本,也就是在操作库存前先获取当前商品的version版本号,然后操作的时候带上此version号。
3)业务层分布式锁
如果多个机器可能在同一时间同时处理相同的数据,比如多台机器定时任务都拿到了相同数据处理,我们就可以加分布式锁,锁定此数据,处理完成后释放锁。获取到锁的必须先判断这个数据是否被处理过。
4.4.3.各种唯一约束
1)数据库唯一约束
2)redis set防重
很多数据需要处理,只能被处理一次,比如我们可以计算数据的MD5将其放入redis的set,每次处理数据,先看这个MD5是否已经存在,存在就不处理。(网盘文件秒传)
4.4.4.防重表
使用订单号orderNo作为去重表的唯一索引,把唯一索引插入去重表,再进行业务操作,且他们在同一个事务中。这个保证了重复请求时,因为去重表有唯一约束,导致请求失败,避免了幂等问题。这里要注意的是,去重表和业务表应该在同一库中,这样就保证在同一个事务,即使业务操作失败了,也会把去重表的数据回滚。这个很好的保证了数据一致性。
4.4.5.全局请求唯一ID
调用接口时,生成一个唯一id,redis将数据保存到集合中(去重),存在即处理过。
可以使用nginx设置每一个请求的唯一id
proxy_set_header X-Request-Id $reqeust_id;
5.下单操作
5.1下单流程
5.2.下单实现
OrderWebController
/**
* 下单功能
* - 创建订单,验证令牌,锁库存 ю
* - 下单成功跳转到支付页
* - 下单失败回到订单确认页,重新确认订单信息
* @param vo
* @return
*/
@PostMapping("/submitOrder")
public String submitOrder(OrderSubmitVO vo, Model model, RedirectAttributes redirectAttributes){
try {
OrderSubmitResponseVO responseVO = orderService.submitOrder(vo);
if (responseVO.getCode() != 0){
String msg = "下单失败:";
switch (responseVO.getCode()){
case 1: msg += OrderCheckEnum.TOKEN_EXCEPTION.getMsg(); break;
case 2: msg += OrderCheckEnum.PRICE_EXCEPTION.getMsg(); break;
case 3: msg += OrderCheckEnum.STOCK_EXCEPTION.getMsg(); break;
default: msg += "系统未知异常";
}
redirectAttributes.addFlashAttribute("msg", msg);
return "redirect:http://order.gmall.com/toTrade";
}
model.addAttribute("orderSubmitResponse", responseVO);
} catch (Exception e) {
e.printStackTrace();
}
return "pay";
}
6.本事事务与分布式事务
6.1.本地事务
6.1.1.事务的基本特性
数据库事务具有四个特征:原子性( Atomicity )、一致性( Consistency )、隔离性( Isolation )和 持久性( Durability )。这四个特性简称为 ACID 特性。
- 原子性(Atomicity):一系列的操作整体不可拆分,要么同时成功,要么同时失败
- 一致性(Consistency):数据在事务的前后,业务整体一致
- 隔离性(Isolation):事务之间互相隔离
- 持久性(Durability):一旦事务成功,数据一定会落盘在数据库
6.1.2.事务的隔离级别
-
READ_UNCOMMITTED(读未提交)
- 该隔离级别的事务会读到其它未提交事务的数据,此现象也称之为脏读
-
READ_COMMITED(读已提交)
- 一个事务可以读取另一个已提交的事务,多次读取会造成不一样的结果,此现象称之为不可重复读问题,Oracle和SQL Server的默认隔离级别
-
REPEATABLE_READ(可重复读)
- 该隔离级别是MySQL默认的隔离级别,在同一个事务里,select的结果是事务开始时时间点的状态,因此,同样的select操作读到的结果会是一致的,但是,会有幻读现象。MySQL的InnoDB引擎可以通过next-key locks机制来避免幻读。
-
SERLALIZABLE(串行化)
- 在该隔离级别下事务都是串行顺序执行的,MySQL数据库的InnoDB引擎会给读操作隐式加一把读共享锁,从而避免了脏读、不可重复读和幻读问题。
6.1.3.事务的传播行为
- PROPAGATION_REQUIRED: 如果存在一个事务,则支持当前事务。如果没有事务则开启
- PROPAGATION_SUPPORTS:如果存在一个事务,支持当前事务。如果没有事务,则非事务的执行
- PROPAGATION_MANDATORY:如果已经存在一个事务,支持当前事务。如果没有一个活动的事务,则抛出异常。
- PROPAGATION_REQUIRES_NEW:总是开启一个新的事务。如果一个事务已经存在,则将这个存在的事务挂起。
- PROPAGATION_NOT_SUPPORTED:总是非事务地执行,并挂起任何存在的事务。
- PROPAGATION_NEVER:总是非事务地执行,如果存在一个活动事务,则抛出异常
- PROPAGATION_NESTED:如果一个活动的事务存在,则运行在一个嵌套的事务中.
如果没有活动事务,则按TransactionDefinition.PROPAGATION_REQUIRED 属性执行
//a事务的所有设置就传播到了和他共用一个事务的方法
@Transactional(timeout = 30)
public void a(){
b(); //a事务
c(); //新事务(不回滚)
int i = 10 / 0;
}
@Transactional(timeout = 10)
public void b(){
//执行了20s
}
@Transactional(propagation = Propagation.REQUIRES_NEW, timeout = 20)
public void c(){
}
6.1.4.Spring Boot事务关键点
1.事务的自动配置
TransactionAutoConfiguration
2.事务的坑
同一个对象内事务方法互调默认失效,在同一个类里面,编写两个方法,内部调用的时候,会导致事务设置失效。原因是没有用到代理对象的缘故。
//a,b,c三个方法在同一个service
//b,c做任何设置都没用,都是和a共用一个事务
@Transactional(timeout = 30)
public void a(){
b(); //a事务
c(); //a事务,c设置的事务无效
}
@Transactional(timeout = 10)
public void b(){
}
@Transactional(propagation = Propagation.REQUIRES_NEW, timeout = 20)
public void c(){
}
解决:使用代理对象来调用事务方法
- 导入spring-boot-starter-aop
- @EnableTransactionManagement(proxyTargetClass = true)
- @EnableAspectJAutoProxy(exposeProxy = true) 开启AspectJ动态代理功能
- AopContext.currentProxy() 获取代理对象来调用事务方法
@Transactional(timeout = 30)
public void a(){
//使用代理对象来调用事务方法
OrderServiceImpl orderService = (OrderServiceImpl)AopContext.currentProxy();
orderService.b();
orderService.c();
}
@Transactional(timeout = 10)
public void b(){
}
@Transactional(propagation = Propagation.REQUIRES_NEW, timeout = 20)
public void c(){
}
6.2.分布式事务
6.2.1.为什么要有分布式事务
分布式系统经常出现的异常:机器宕机,网络异常,消息丢失,消息乱序,数据错误,不可靠的TCP,存储数据丢失…
分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是微服务架构中,几乎可以说是无法避免。
6.2.2.CAP定理与BASE理论
1.CAP定理
CAP原则又称CAP定理,指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。
- 一致性(C):在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
- 可用性(A):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
- 分区容忍性(P):以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。
一般来说,分区容错无法避免,因此可以认为CAP的P总是成立。那么剩下的C和A无法同时做到。
分布式系统中实现一致性的raft算法,Paxos算法等
raft算法动画演示:http://thesecretlivesofdata.com/raft 、http://raft.github.io
CAP三个特性只能满足其中两个,那么取舍的策略就共有三种:
CA withoutP:如果不要求P(不允许分区),则C(强一致性)和A(可用性)是可以保证的。但放弃P的同时也就意味着放弃了系统的扩展性,也就是分布式节点受限,没办法部署子节点,这是违背分布式系统设计的初衷的。传统的关系型数据库RDBMS:Oracle、MySQL就是CA。
CP without A:如果不要求A(可用),相当于每个请求都需要在服务器之间保持强一致,而P(分
区)会导致同步时间无限延长(也就是等待数据同步完才能正常访问服务),一旦发生网络故障或者消息丢失等情况,就要牺牲用户的体验,等待所有数据全部一致了之后再让用户访问系统。设计成
CP的系统其实不少,最典型的就是分布式数据库,如Redis、HBase等。对于这些分布式数据库来说,数据的一致性是最基本的要求,因为如果连这个标准都达不到,那么直接采用关系型数据库就好,没必要再浪费资源来部署分布式数据库。
AP wihtoutC:要高可用并允许分区,则需放弃一致性。一旦分区发生,节点之间可能会失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会导致全局数据的不一致性。典型的应用就如某米的抢购手机场景,可能前几秒你浏览商品的时候页面提示是有库存的,当你选择完商品准备下单的时候,系统提示你下单失败,商品已售完。这其实就是先在
A(可用性)方面保证系统可以正常的服务,然后在数据的一致性方面做了些牺牲,虽然多少会影响一些用户体验,但也不至于造成用户购物流程的严重阻塞。
2.面临的问题
对于大多数互联网应用的场景,主机众多,部署分散,而且现在的集群规模越来越大,所以节点故障,网络故障是常态,而且要保证服务可用性达到99.99999%(N个9),即保证P和A,舍弃C。
3.BASE理论
是对CAP理论的延伸,思想是即使无法做到强一致性(CAP的一致性就是强一致性),但可以适当的采用弱一致性,即最终一致性。
BASE是Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)三个短语的简写,BASE是对CAP中一致性和可用性权衡的结果,其来源于对大规模互联网系统分布式实践的结论,是基于CAP定理逐步演化而来的,其核心思想是即使无法做到强一致性(Strongconsistency),但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。接下来我们着重对BASE中的三要素进行详细讲解。
基本可用(Basically Available)
基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性——但请注意,这绝不等价于系统不可用,以下两个就是“基本可用”的典型例子。
- 响应时间上的损失:正常情况下,一个在线搜索引擎需要0.5秒内返回给用户相应的查询结果,但由于出现异常(比如系统部分机房发生断电或断网故障),查询结果的响应时间增加到了1~2秒。
- 功能上的损失:正常情况下,在一个电子商务网站上进行购物,消费者几乎能够顺利地完成每一笔订单,但是在一些节日大促购物高峰的时候,由于消费者的购物行为激增,为了保护购物系统的稳定性,部分消费者可能会被引导到一个降级页面。
软状态/弱状态(Soft State)
是指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性,即允许系统在不同节点的数据副本之间进行数据听不的过程存在延时。
最终一致性(Eventual Consistency)
最终一致性强调的是系统中所有的数据副本,在经过一段时间的同步后,最终能够达到一个一致的状态。因此,最终一致性的本质是需要系统保证最终数据能够达到一致,而不需要实时保证系统数据的强一致性。
4.强一致性,弱一致性,最终一致性
从客户端角度,多进程并发访问时,更新过的数据在不同进程如何获取的不同策略,决定了不同的一致性。对于关系型数据库,要求更新过的数据能被后续的访问都能看到,这是强一致性。如果能容忍后续的部分或者全部访问不到,则是弱一致性。如果经过一段时间后要求能访问到更新后的数据,则是最终一致性。
6.2.3.分布式事务几种方案
1.2PC模式/3PC模式
数据库支持的2PC(2 phase commit),又叫做 XA Transactions。
MySQL从5.5版本开始支持,SQL Server 2005开始支持,Oracle 7开始支持。其中,XA是一个两阶段提交协议,该协议分为以下两个阶段:
- 第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交
- 第二阶段:事务协调器要求每个数据库提交数据。
其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚他们在此事务中的那部分信息。
- XA协议比较简单,而且一旦商业数据库实现了XA协议,使用分布式事务的成本也比较低。
- XA性能不理想,特别是在交易下单链路,往往并发量很高,XA无法满足高并发场景
- XA目前在商业数据库支持的比较理想,在MySQL数据库中支持的不理想,MySQL的XA实现,没有记录prepare阶段日志,主备切换回导致主库与备库数据不一致。
- 许多NoSQL也没有支持XA,这让XA的应用场景变动非常狭隘。
- 也有3PC,引入了超时机制(无论协调者还是参与者,在向对方发送请求后,若长时间未收到回应则做出相应处理)
3PC是2PC的改进版,实质是将2PC中提交事务请求拆分为两步,形成了CanCommit、 PreCommit、
doCommit三个阶段的事务一致性协议。
3PC相较于2PC而言,解决了协调者挂点后参与者无限阻塞和单点问题,但是仍然无法解决网络分
区问题
2.柔性事务-TCC事务补偿型方案
- 刚性事务:遵循ACID原则,强一致性
- 柔性事务:遵循BASE理论,最终一致性
- 与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致
- 一阶段 prepare 行为:调用自定义的prepare逻辑
- 二阶段commit行为:调用自定义的commit逻辑
- 二阶段rollback行为:调用自定义的rollback逻辑
所谓TCC模式,是指支持把自定义的分支事务纳入到全局事务的管理中。(3PC手动版)
3.柔性事务-最大努力通知
按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对。这种方案主要用在与第三方系统通讯时,比如:调用微信支付或支付宝支付后的支付结果通知。这种方案也是结合MQ进行实现。例如:通过MQ发送http请求,设置最大通知次数,达到通知次数后即不再通知。
案例:银行通知,商户通知等(各大交易业务平台间的商户通知:多次通知、查询校对、对账文件),支付宝的支付成功异步回调。
4.柔性事务-可靠消息+最终一致性方案(异步确保型)
实现:业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才会真正发送消息。
步骤如下:
- 服务A向消息中间件发送一条预备消息。
- 消息中间件保存预备消息并返回成功。
- 服务A执行本地事务。
- 服务A发送提交消息给消息中间件,服务B接收到消息之后执行本地事务。
基于消息中间件的两阶段提交往往用在高并发场景下,将一个分布式事务拆成一个消息事务(服务A的本地操作+发消息)+服务B的本地操作,其中服务B的操作由消息驱动,只要消息事务成功,那么服务A一定成功,消息也一定发出来了,这时候服务B会收到消息去执行本地操作,如果本地操作失败,消息会重投,直到服务B操作成功,这样就变相地实现了A与B的分布式事务。
7.Seata实现分布式事务
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
7.1.创建uodo_log表
给每一个微服务数据库都创建 undo_log 表
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modifĕ ed` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
7.2.安装,配置,启动seata-server服务
安装Seata事务协调器 seata-server 并配置启动
- 从 https://github.com/seata/seata/releases,下载服务器软件包,将其解压缩
- registry.conf:注册中心配置
- 注册中心使用Nacos
registry {
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "192.168.139.10:8848"
group = "SEATA_GROUP"
namespace = "fa4978dd-5532-4534-be18-2c99205dc7b1"
cluster = "default"
username = "nacos"
password = "nacos"
}
}
-
- 配置中心使用Nacos
config {
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "192.168.139.10:8848"
group = "SEATA_GROUP"
namespace = "d78e3e40-c502-4d56-b267-163b30c1c38a"
cluster = "default"
username = "nacos"
password = "nacos"
}
}
- file.conf:事务日志存储配置
- 文件存储全局事务日志
- DB存储全局事务日志
## transaction log store, only used in seata-server
store {
## store mode: file、db、redis
mode = "db"
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari)etc.
datasource = "druid"
## mysql/oracle/postgresql/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://192.168.139.10:3306/seata"
user = "root"
password = "root"
minConn = 5
maxConn = 100
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
maxWait = 5000
}
}
创建 seata 数据库,并创建三张表
-- the table to store GlobalSession data
drop table if exists `global_table`;
create table `global_table` (
`xid` varchar(128) not null,
`transaction_id` bigint,
`status` tinyint not null,
`application_id` varchar(32),
`transaction_service_group` varchar(32),
`transaction_name` varchar(128),
`timeout` int,
`begin_time` bigint,
`application_data` varchar(2000),
`gmt_create` datetime,
`gmt_modified` datetime,
primary key (`xid`),
key `idx_gmt_modified_status` (`gmt_modified`, `status`),
key `idx_transaction_id` (`transaction_id`)
);
-- the table to store BranchSession data
drop table if exists `branch_table`;
create table `branch_table` (
`branch_id` bigint not null,
`xid` varchar(128) not null,
`transaction_id` bigint ,
`resource_group_id` varchar(32),
`resource_id` varchar(256) ,
`lock_key` varchar(128) ,
`branch_type` varchar(8) ,
`status` tinyint,
`client_id` varchar(64),
`application_data` varchar(2000),
`gmt_create` datetime,
`gmt_modified` datetime,
primary key (`branch_id`),
key `idx_xid` (`xid`)
);
-- the table to store lock data
drop table if exists `lock_table`;
create table `lock_table` (
`row_key` varchar(128) not null,
`xid` varchar(96),
`transaction_id` long ,
`branch_id` long,
`resource_id` varchar(256) ,
`table_name` varchar(32) ,
`pk` varchar(36) ,
`gmt_create` datetime ,
`gmt_modified` datetime,
primary key(`row_key`)
);
-
- Redis存储全局事务日志
-
把Seata配置信息导入到Nacos配置中心
将 config.txt 文件上传到 seata/config.txt
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefĕ x=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
# 分布式事务的服务
service.vgroupMapping.gmall-order-service-group=default
service.vgroupMapping.gmall-ware-service-group=default
service.default.grouplist=192.168.139.10:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=file
store.fĕ le.dir=file_store/data
store.fĕ le.maxBranchSessionSize=16384
store.fĕ le.maxGlobalSessionSize=512
store.fĕ le.fĕ leWriteBufferCacheSize=16384
store.fĕ le.flushDiskMode=async
store.fĕ le.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
# seata数据库链接地址
store.db.url=jdbc:mysql:// 192.168.139.10:3306/seata?useUnicode=true
store.db.user=root
store.db.password=root
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.host=127.0.0.1
store.redis.port=6379
store.redis.maxConn=10
store.redis.minConn=1
store.redis.database=0
store.redis.password=null
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
将nacos-config.sh上传到 seata/conf/nacos-config.sh
while getopts ":h:p:g:t:u:w:" opt
do
case $opt in
h)
host=$OPTARG
;;
p)
port=$OPTARG
;;
g)
group=$OPTARG
;;
t)
tenant=$OPTARG
;;
u)
username=$OPTARG
;;
w)
password=$OPTARG
;;
?)
echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
exit 1
;;
esac
done
if [[ -z ${host} ]]; then
host=localhost
fi
if [[ -z ${port} ]]; then
port=8848
fi
if [[ -z ${group} ]]; then
group="SEATA_GROUP"
fi
if [[ -z ${tenant} ]]; then
tenant=""
fi
if [[ -z ${username} ]]; then
username=""
fi
if [[ -z ${password} ]]; then
password=""
fi
nacosAddr=$host:$port
contentType="content-type:application/json;charset=UTF-8"
echo "set nacosAddr=$nacosAddr"
echo "set group=$group"
failCount=0
tempLog=$(mktemp -u)
function addConfig() {
curl -X POST -H "${contentType}" "http: Ѯ $nacosAddr/nacos/v1/cs/configs?
dataId=$1&group=$group&content=$2&tenant=$tenant&username=$username&password
=$password" >"${tempLog}" 2>/dev/null
if [[ -z $(cat "${tempLog}") ]]; then
echo " Please check the cluster status. "
exit 1
fi
if [[ $(cat "${tempLog}") =~ "true" ]]; then
echo "Set $1=$2 successfully "
else
echo "Set $1=$2 failure "
(( failCount++ ))
fi
}
count=0
for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
(( count++ ))
key=${line%%=*}
value=${line#*=}
addConfig "${key}" "${value}"
done
echo
"======================================================================="
echo " Complete initialization parameters, total-count:$count , failure-count:$failCount "
echo
"======================================================================="
if [[ ${failCount} -eq 0 ]]; then
echo " Init nacos config finished, please start seata-server. "
else
echo " init nacos config fail. "
fi
执行nacos-config.sh
sh nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t
d78e3e40-c502- 4d56-b267-163b30c1c38a -u nacos -w nacos
命令解析:-h -p 指定nacos的端口地址;-g 指定配置的分组,注意,是配置的分组;-t 指定命名空间id; -u -w指定nacos的用户名和密码,同样,这里开启了nacos注册和配置认证的才需要指定。
进入Nacos配置中心,在stata命名空间下可以看到配置写入成功
启动seata-server
sh seata-server.sh -p 8091 -m file
7.3.整合Seata
7.3.1.导入依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
7.3.2.DataSourceProxy
所有需要用到分布式事务的微服务使用seata DataSourceProxy代理自己的数据源
package com.atguigu.gmall.ware.config;
import com.zaxxer.hikari.HikariDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import javax.sql.DataSource;
/**
* Seata 配置 {@link SeataConfig}
*
* @author zhangwen
* @email: 1466787185@qq.com
*/
@Configuration
public class SeataConfig {
@Bean
public DataSource dataSource(DataSourceProperties dataSourceProperties){
HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder()
.type(HikariDataSource.class).build();
if (StringUtils.hasText(dataSourceProperties.getName())) {
dataSource.setPoolName(dataSourceProperties.getName());
}
//返回seata代理数据源
return new DataSourceProxy(dataSource);
}
}
7.3.3.Seata配置指定从配置中心获取
seata.config.type=nacos
seata.config.nacos.group=SEATA_GROUP
seata.config.nacos.server-addr=192.168.139.10:8848
seata.config.nacos.namespace=d78e3e40-c502-4d56-b267-163b30c1c38a
seata.config.nacos.username=nacos
seata.config.nacos.password=nacos
seata.tx-service-group=gmall-order-service-group
seata.service.vgroup-mapping.gmall-order-service-group=default
7.3.4.@GlobalTransactional
在主业务方法上使用 @GlobalTransactional 标注全局事务,子业务使用 Transactional 标注
/**
* 下单操作
*/
@GlobalTransactional
@Transactional
@Override
public OrderSubmitResponseVO submitOrder(OrderSubmitVO vo) {
//远程锁定订单商品库存
R r = wareFeignService.orderLockStock(lockVO);
if (r.getCode() != 0) {
//锁定失败,订单和库存都需要回滚
response.setCode(OrderCheckEnum.STOCK_EXCEPTION.getCode());
return response;
}
}
7.4.总结
- Seata的AT模式不适合高并发场景
- 高并发场景考虑使用消息事务
- 最大努力通知
- 可靠消息+最终一致性
8.监听库存解锁
8.1.整合RabbitMQ
8.1.1.导入依赖
<dependency>
<groupId>org.springframework.boot-groupId>
<artifactId>spring-boot-starter-amqp-artifactId>
</dependency>
8.1.2.配置RabbitMQ
spring:
rabbitmq:
addresses: 192.168.139.10:5672
virtual-host: /
username: admin
password: admin
listener:
simple:
acknowledge-mode: manual
8.1.3.主启动类标注注解
@EnableRabbit
@EnableFeignClients
@EnableDiscoveryClient
@SpringBootApplication
public class GmallWareApplication {
public static void main(String[] args) {
SpringApplication.run(GmallWareApplication.class, args);
}
}
8.2.库存解锁实现
8.2.1.库存解锁流程
8.2.2.RabbitMQ配置
package com.atguigu.gmall.ware.config;
import com.atguigu.gmall.ware.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* RabbitMQ配置 {@link RabbitConfig}
*
* @author zhangwen
* @email: 1466787185@qq.com
*/
@Configuration
public class RabbitConfig {
/**
* 使用JSON序列化机制进行消息转换
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public Exchange stockEventExchange(){
return new TopicExchange(RabbitConstant.STOCK_EVENT_EXCHANGE, true, false);
}
@Bean
public Queue stockReleaseQueue(){
return new Queue(RabbitConstant.STOCK_RELEASE_QUEUE, true, false, false);
}
/**
* 延时队列
* 队列里的消息过期后路由到死信交换机
* @return
*/
@Bean
public Queue stockDelayQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", RabbitConstant.STOCK_EVENT_EXCHANGE);
arguments.put("x-dead-letter-routing-key", RabbitConstant.STOCK_DELAY_DEAD_LETTER_ROUTING_KEY);
arguments.put("x-message-ttl", 120000);
return new Queue(RabbitConstant.STOCK_DELAY_QUEUE, true, false, false, arguments);
}
@Bean
public Binding stockReleaseBinding(){
return new Binding(RabbitConstant.STOCK_RELEASE_QUEUE, Binding.DestinationType.QUEUE,
RabbitConstant.STOCK_EVENT_EXCHANGE, RabbitConstant.STOCK_RELEASE_QUEUE_ROUTING_KEY, null);
}
@Bean
public Binding stockLockedBinding(){
return new Binding(RabbitConstant.STOCK_DELAY_QUEUE, Binding.DestinationType.QUEUE,
RabbitConstant.STOCK_EVENT_EXCHANGE, RabbitConstant.STOCK_DELAY_QUEUE_ROUTING_KEY, null);
}
}
8.2.3.RabbitMQ监听
/**
* Rabbit 监听器 {@link StockReleaseListener}
*
* @author zhangwen
* @email: 1466787185@qq.com
*/
@RabbitListener(queues = RabbitConstant.STOCK_RELEASE_QUEUE)
@Slf4j
@Component
public class StockReleaseListener {
@Autowired
private WareSkuService wareSkuService;
/**
* 监听自动解锁库存
*/
@RabbitHandler
public void handleStockLockedRelease(StockLockedTO lockedTO, Message message,Channel channel) {
log.error("收到解锁库存信息");
try {
wareSkuService.unLockStock(lockedTO);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}
8.2.4.解锁库存业务逻辑实现
/**
* 自动解锁库存
* @param lockedTO
*/
@Override
public void unLockStock(StockLockedTO lockedTO) {
// 1.查询数据库关于这个订单的锁定库存信息
// 有记录:证明库存锁定成功
// - 解锁
// - 订单情况:
// - 1. 没有这个订单,必须解锁
// - 2. 有这个订单,根据订单状态,已取消:解锁库存,没有取消:不能解锁
// 无记录:库存锁定失败,库存回滚了,这种情况无需解锁
StockDetailTO detailTO = lockedTO.getDetailTO();
WareOrderTaskDetailEntity taskDetailEntity = wareOrderTaskDetailService.getById(detailTO.getId());
if (taskDetailEntity != null) {
//解锁
Long id = lockedTO.getId();
WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);
//根据订单号查询订单状态
R r = orderFeignService.getOrderStatus(taskEntity.getOrderSn());
if (r.getCode() == 0) {
OrderVO orderVO = r.getData("data", new TypeReference<OrderVO>() {});
//没有订单,订单取消,都必须解锁库存
if (orderVO == null || OrderStatusEnum.CANCLED.getCode().equals(orderVO.getStatus())) {
//当前库存工作单的状态为已锁定状态,才需要解锁
if (StockLockStatusEnum.LOCKED.getCode().equals(taskDetailEntity.getLockStatus())) {
unLockStock(detailTO.getSkuId(), detailTO.getWareId(),
detailTO.getSkuNum(), detailTO.getId());
}
}
} else {
log.error("调用远程服务 yomallб order 查询订单异常");
throw new WareException("调用远程服务 yomallб order 查询订单异常");
}
}
}
/**
* 解锁库存
*/
private void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId)
{
//更新锁定库存
wareSkuDao.unLockStock(skuId, wareId, num);
//更新库存工作单状态
WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity();
taskDetailEntity.setId(taskDetailId);
taskDetailEntity.setLockStatus(StockLockStatusEnum.UNLOCK.getCode());
wareOrderTaskDetailService.updateById(taskDetailEntity);
}
9.定时关单与库存解锁
9.1.流程
10.消息可靠性
10.1.消息丢失
10.1.1.消息发送出去,由于网络问题没有抵达服务器
- 保证消息一定会发送出去,每一个消息做好日志记录(写入消息表) 定期扫描消息表,将失败的消息再次发送
消息表 mq_message
CREATE TABLE mq_message
(
message_id char(32) NOT NULL,
content text,
to_exchange varchar(255) DEFAULT NULL,
routing_key varchar(255) DEFAULT NULL,
class_type varchar(255) DEFAULT NULL,
message_status int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',
create_time datetime DEFAULT NULL,
update_time datetime DEFAULT NULL,
PRIMARY KEY('message_id')
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
10.1.2.消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机
- publisher必须加入确认回调机制(ConfirmCallback,ReturnCallback)
- ConfirmCallback,修改消息表消息状态为已抵达
- ReturnCallback,修改消息表消息状态为错误
10.1.3.自动ACK模式,消费者收到消息,但没来得及消费,宕机
一定开启手动ACK,消费成功才移除,失败或者没来得及处理就NoACK,并将消息重回队列
10.1.4.总结
-
做好消息确认机制
- Publisher,ConfirmCallback 、ReturnCallback
- Consumer,手动ACK
-
每一条消息都在数据库做好记录,定期将失败的消息重新发送
10.2.消息重复
- 消息消费成功,事务已经提交,ack时,机器宕机,导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者
- 消息消费失败,由于重试机制,自动又将消息发送出去
- 成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送
- 消费者的业务消费接口应该设计为幂等性,比如:减库存有工作单的状态标志
- 使用防重表,发送消息每一个都有业务的唯一标识,处理过就不用处理
- RabbitMQ的每一条消息都有redelivered属性,可以获取是否是被重新投递过来的,而不是第一次投递过来的
10.3.消息积压
- 消费者宕机积压
- 消费者消费能力不足积压
- 发送者发送流量太大
- 上线更多的消费者,进行正常消费
- 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理