Restful定义
- Restful是一种软件架构与设计风格, 并非一套标准, 只提供了一些原则与约定条件。
- REST提供了一组架构约束,当作为一个整体来应⽤用时,强调组件交互的可伸缩性。
- 接⼝口的通⽤用性、组件的独⽴立部署、以及⽤用来减少交互延迟、增强安全性、封装遗留留系统的中间组件。
- 满足这些约束条件和原则的应用程序或设计就是Restful。
Richardson成熟模型
等级2加入了HTTP方法处理:
URI | HTTP方法 | 说明 |
---|---|---|
/order/ | GET | 获取所有订单信息 |
/order/ | POST | 增加新的订单信息 |
/order/{id} | GET | 获取指定的订单信息 |
/order/{id} | DELETE | 删除指定的订单信息 |
/order/{id} | PUT | 修改指定的订单信息 |
等级3为超媒体控制, 也是最为成熟的Rest模型。
常用HTTP状态码
状态码 | 描述 | 状态码 | 描述 |
---|---|---|---|
200 | OK | 400 | Bad |
201 | Created | 401 | Unauthorized |
202 | Accepted | 403 | Forbidden |
301 | Moved Permanently | 404 | Not Found |
303 | See Other | 410 | Gone |
304 | Not Modified | 500 | Internal Server Error |
307 | Temporary Redirect | 503 | Service Unavailable |
良好的URI规范
- URI的路径采用斜杠分隔符(/)来表示资源之间的层次关系。
- URI的路径使用逗号(,)与分号(;)来表示非层次元素。
- URI的查询部分,使用与符号(&)来分割参数。
- URI中避免出现文件扩展名(例:.jsp、.json、.xml、.html)
HATEOAS介绍
HATEOAS(Hypermedia as the engine of application state)是 REST 架构风格中最复杂的约束,也是构建成熟 REST 服务的核心。
HATEOAS常用链接类型
REL | 说明 |
---|---|
SELF | 指向当前资源本身的链接 |
edit | 指向⼀一个可以编辑当前资源的链接 |
collection | 如果当前资源包含在某个集合中,指向该集合的链接 |
search | 指向⼀一个可以搜索当前资源与其相关资源的链接 |
related | 指向⼀一个与当前资源相关的链接 |
first | 集合遍历相关的类型,指向第⼀一个资源的链接 |
last | 集合遍历相关的类型,指向最后⼀一个资源的链接 |
previous | 集合遍历相关的类型,指向上⼀一个资源的链接 |
next | 集合遍历相关的类型,指向下⼀一个资源的链接 |
3.4 HATEOAS CRUD示例
-
显示接口
http://39.98.152.160:10680/admin/accountWarnNotifyTemplate/search
-
分页查询
支持排序:
-
新增数据
-
更新数据
http://39.98.152.160:10680/admin/accountWarnNotifyTemplate/15
链接附带数据唯一ID, 提交采用PUT方式。
-
删除数据
http://39.98.152.160:10680/admin/accountWarnNotifyTemplate/15
提交方式采用PUT。
HATEOAS实现
服务设计
采用Spring Data Rest 实现 Hypermedia规范。
设计两个服务, 订单服务和股票服务, 两个服务遵循Hateoas风格。
- Step 1: 通过Restful的Hypermedia模型调用股票服务, 查询并打印股票信息。
- Step 2: 通过HTTP PUT动作更新股票价格。
- Step 3: 重新调用股票信息接口,打印股票名称与价格。
- Step 4: 以上步骤操作成功后, 订单服务调用自身接口, 生成订单信息。
工程说明
数据层采用spring data jpa,spring提供的一套简化JPA开发的框架,按照约定好的【方法命名规则】写dao层接口,就可以在不写接口实现的情况下,实现对数据库的访问和操作。同时提供了很多除了CRUD之外的功能,如分页、排序、复杂查询等等。
本工程侧重Hateoas的理解, 数据库采用简化的H2内存数据库, 重新启动服务数据消失。
- hateoas-demo父级工程
父级工程, 负责子工程依赖与打包管理。
POM依赖:
<dependencies>
<!-- spring boot 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- Spring Restful实现组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-rest</artifactId>
</dependency>
<dependency>
<groupId>org.jadira.usertype</groupId>
<artifactId>usertype.core</artifactId>
<version>6.0.1.GA</version>
</dependency>
<!-- 增加Jackson的Hibernate类型支持 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-hibernate5</artifactId>
<version>2.9.8</version>
</dependency>
<!-- 增加Jackson XML支持 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
- hateoas-stocks股票服务工程
- HateoasStocksApplication启动类:
@SpringBootApplication()
@ComponentScan(basePackages = {"com.itcast"})
@EntityScan(basePackages = {"com.itcast"})
@EnableJpaRepositories(basePackages = {"com.itcast"})
@EnableCaching
public class HateoasStocksApplication {
public static void main(String[] args) {
SpringApplication.run(HateoasStocksApplication.class, args);
}
@Bean
public Hibernate5Module hibernate5Module() {
return new Hibernate5Module();
}
@Bean
public Jackson2ObjectMapperBuilderCustomizer jacksonBuilderCustomizer() {
return builder -> {
builder.indentOutput(true);
};
}
}
注意: 要加上EntityScan与EnableJpaRepositories注解,指定路径, 否则不生效。
-
StockRepository代码:
定义两个方法,根据名称集合查找多个股票信息; 根据指定名称查找股票信息。
按照JPA规范,按照方法名称自动映射解析, 无须写SQL。
@RepositoryRestResource(path = "/stocks")
public interface StockRepository extends JpaRepository<StocksEntity, Long> {
List<StocksEntity> findByNameInOrderById(@Param("list") List<String> list);
public StocksEntity findByName(@Param("name") String name);
}
-
hateoas-order订单服务工程
HateoasOrderApplication启动类:
@SpringBootApplication @ComponentScan(basePackages = {"com.itcast"}) @EntityScan(basePackages = {"com.itcast"}) @EnableJpaRepositories(basePackages = {"com.itcast"}) public class HateoasOrderApplication { public static void main(String[] args) { SpringApplication.run(HateoasOrderApplication.class, args); } /** * 采用JACKSON作为JSON处理组件 * @return */ @Bean public Jackson2HalModule jackson2HalModule() { return new Jackson2HalModule(); } /** * 设置HTTP连接池参数 * @return */ @Bean public HttpComponentsClientHttpRequestFactory requestFactory() { PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(30, TimeUnit.SECONDS); connectionManager.setMaxTotal(200); connectionManager.setDefaultMaxPerRoute(20); CloseableHttpClient httpClient = HttpClients.custom() .setConnectionManager(connectionManager) .evictIdleConnections(30, TimeUnit.SECONDS) .disableAutomaticRetries() // Keep-Alive 策略 .setKeepAliveStrategy(new RemoteConnectionKeepAliveStrategy()) .build(); HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(httpClient); return requestFactory; } /** * 设置RestTemplate参数 * @param builder * @return */ @Bean public RestTemplate restTemplate(RestTemplateBuilder builder) { return builder .setConnectTimeout(Duration.ofMillis(2000)) .setReadTimeout(Duration.ofMillis(1800)) .requestFactory(this::requestFactory) .build(); } }
核心处理类RemoteRunner实现ApplicationRunner接口, 系统启动成功后便会执行run方法:
@Component @Slf4j public class RemoteRunner implements ApplicationRunner { private static final URI ROOT_URI = URI.create("http://localhost:8080/"); @Autowired private RestTemplate restTemplate; @Autowired private OrderRepository orderRepository; @Override public void run(ApplicationArguments args) throws Exception { Link stocksLink = getLink(ROOT_URI,"stocksEntities"); // Step 1: 查询股票信息 queryStocks(stocksLink); // Step 2: 更新股票价格 Link updateLink= getLink(ROOT_URI.resolve("stocks/1"),"stocksEntity"); Resource<StocksEntity> americano = updateStocks(updateLink); // Step 3: 重新查询打印股票信息 queryStocks(stocksLink); // Step 4: 生成订单信息 OrderEntity order = OrderEntity.builder() .user("mirson") .stockName("建设银行") .volume(1000) .price(99.9) .build(); orderRepository.save(order); } /** * 获取请求链接 * @param uri * @param rel * @return */ private Link getLink(URI uri, String rel) { ResponseEntity<Resources<Link>> rootResp = restTemplate.exchange(uri, HttpMethod.GET, null, new ParameterizedTypeReference<Resources<Link>>() {}); Link link = rootResp.getBody().getLink(rel); log.info("Link: {}", link); return link; } /** * 查询股票信息 * @param stocksLink */ private void queryStocks(Link stocksLink) { ResponseEntity<PagedResources<Resource<StocksEntity>>> stocksResp = restTemplate.exchange(stocksLink.getTemplate().expand(), HttpMethod.GET, null, new ParameterizedTypeReference<PagedResources<Resource<StocksEntity>>>() {}); if(null != stocksResp.getBody() && null != stocksResp.getBody().getContent() ) { StringBuffer strs = new StringBuffer(); stocksResp.getBody().getContent().forEach((s)->{ strs.append(s.getContent().getName()).append(":").append(s.getContent().getPrice()).append( ","); }); String resp = strs.toString().replaceAll(",$", ""); log.info("query stocks ==> " + resp); }else { log.info("query stocks ==> empty! "); } } /** * 更新股票信息 * @param link * @return */ private Resource<StocksEntity> updateStocks(Link link) { StocksEntity americano = StocksEntity.builder() .name("中国平安") .price(68.9) .build(); RequestEntity<StocksEntity> req = RequestEntity.put(link.getTemplate().expand()).body(americano); ResponseEntity<Resource<StocksEntity>> resp = restTemplate.exchange(req, new ParameterizedTypeReference<Resource<StocksEntity>>() {}); log.info("add Stocks ==> {}", resp); return resp.getBody(); } }
3.3 启动股票服务验证
-
启动股票服务,通过HTTP访问, 来查看Rest接口信息
-
地址: http://127.0.0.1:8080/
{ "_links" : { "stocksEntities" : { "href" : "http://127.0.0.1:8080/stocks{?page,size,sort}", "templated" : true }, "profile" : { "href" : "http://127.0.0.1:8080/profile" } } }
可以看到我们定义的/stocks接口
-
地址: http://127.0.0.1:8080/stocks
{ "_embedded" : { "stocksEntities" : [ { "createTime" : "2019-07-09T14:20:44.644+0000", "updateTime" : "2019-07-09T14:20:44.644+0000", "name" : "中国平安", "price" : 68.6, "_links" : { "self" : { "href" : "http://127.0.0.1:8080/stocks/1" }, "stocksEntity" : { "href" : "http://127.0.0.1:8080/stocks/1" } } }, { "createTime" : "2019-07-09T14:20:44.647+0000", "updateTime" : "2019-07-09T14:20:44.647+0000", "name" : "工商银行", "price" : 58.8, "_links" : { "self" : { "href" : "http://127.0.0.1:8080/stocks/2" }, "stocksEntity" : { "href" : "http://127.0.0.1:8080/stocks/2" } } }, { "createTime" : "2019-07-09T14:20:44.648+0000", "updateTime" : "2019-07-09T14:20:44.648+0000", "name" : "招商银行", "price" : 98.9, "_links" : { "self" : { "href" : "http://127.0.0.1:8080/stocks/3" }, "stocksEntity" : { "href" : "http://127.0.0.1:8080/stocks/3" } } } ] }, "_links" : { "self" : { "href" : "http://127.0.0.1:8080/stocks{?page,size,sort}", "templated" : true }, "profile" : { "href" : "http://127.0.0.1:8080/profile/stocks" }, "search" : { "href" : "http://127.0.0.1:8080/stocks/search" } }, "page" : { "size" : 20, "totalElements" : 3, "totalPages" : 1, "number" : 0 } }
打印了所有股票信息,最下面还暴露了其他接口信息。
-
地址: http://127.0.0.1:8080/stocks/search
{ "_links" : { "findByName" : { "href" : "http://127.0.0.1:8080/stocks/search/findByName{?name}", "templated" : true }, "findByNameInOrderById" : { "href" : "http://127.0.0.1:8080/stocks/search/findByNameInOrderById{?list}", "templated" : true }, "self" : { "href" : "http://127.0.0.1:8080/stocks/search" } } }
打印了我们通过JPA定义的两个接口,findByNameInOrderById与findByName。
-
地址: http://127.0.0.1:8080/stocks/search/findByNameInOrderById?list=中国平安,test
查询中国平安与test两只股票,结果:
{ "_embedded" : { "stocksEntities" : [ { "createTime" : "2019-07-09T14:20:44.644+0000", "updateTime" : "2019-07-09T14:20:44.644+0000", "name" : "中国平安", "price" : 68.6, "_links" : { "self" : { "href" : "http://127.0.0.1:8080/stocks/1" }, "stocksEntity" : { "href" : "http://127.0.0.1:8080/stocks/1" } } } ] }, "_links" : { "self" : { "href" : "http://127.0.0.1:8080/stocks/search/findByNameInOrderById?list=%E4%B8%AD%E5%9B%BD%E5%B9%B3%E5%AE%89,latte" } } }
-
3.4 启动订单服务验证
-
启动订单服务, 完整验证四个步骤处理流程。
条件: 订单服务, 预期是修改中国平安的股票, 从价格68.6改成68.9;
新增订单信息: 股票名称建设银行,用户名是mirson, 交易数量为1000, 价格为99.9。
-
控制台日志
看到两行日志,中国平安的价格发生了变化,从68.6修改为68.9。
-
查看order服务的订单信息
地址: http://127.0.0.1:8082/order
{ "_embedded" : { "orderEntities" : [ { "createTime" : "2019-07-09T14:35:42.520+0000", "updateTime" : "2019-07-09T14:35:42.520+0000", "user" : "mirson", "stockName" : "建设银行", "volume" : 1000, "price" : 99.9, "_links" : { "self" : { "href" : "http://127.0.0.1:8082/order/1" }, "orderEntity" : { "href" : "http://127.0.0.1:8082/order/1" } } } ] }, "_links" : { "self" : { "href" : "http://127.0.0.1:8082/order{?page,size,sort}", "templated" : true }, "profile" : { "href" : "http://127.0.0.1:8082/profile/order" }, "search" : { "href" : "http://127.0.0.1:8082/order/search" } }, "page" : { "size" : 20, "totalElements" : 1, "totalPages" : 1, "number" : 0 } }
生成了我们新增的订单信息。此外还可以通过Postman来模拟增删改查操作, Spring Data Rest都帮我们做好封装。
通过Spring Data Rest整个实现流程非常简单, 没有Controller层, 这正是Restful的设计风格, 以资源为对象, 无需过多的流程,转换处理。
GPRC简介
gRPC 是Google开源的高性能、通用的RPC框架。客户端与服务端约定接口调用, 可以在各种环境中运行,具有跨语言特性, 适合构建分布式、微服务应用。
-
性能优异:
-
采用Proto Buffer作序列化传输, 对比JSON与XML有数倍提升。
-
采用HTTP2协议, 头部信息压缩, 对连接进行复用, 减少TCP连接次数。
-
gRPC底层采用Netty作为NIO处理框架, 提升性能。
-
-
多语言支持,多客户端接入, 支持C++/GO/Ruby等语言。
-
支持负载均衡、跟踪、健康检查和认证。
GPRC线程模型
gRPC 的线程模型遵循 Netty 的线程分工原则,协议层消息的接收和编解码由 Netty 的 I/O(NioEventLoop) 线程负责, 应用层的处理由应用线程负责,防止由于应用处理耗时而阻塞 Netty 的 I/O 线程。
BIO线程模型采用了线程池,但是后端的应用处理线程仍然采用同步阻塞的模型,阻塞的时间取决对方I/O处理的速度和网络I/O传输的速度。
客户端调用流程
- 客户端 Stub 调用 发起 RPC 调用 远程服务。
- 获取服务端的地址信息(列表),使用默认的 LoadBalancer 策略,选择一个具体的 gRPC 服务端。
- 如果与服务端之间没有可用的连接,则创建 NettyClientTransport 和 NettyClientHandler,建立 HTTP/2 连接。
- 对请求使用 PB(Protobuf)序列化,通过 HTTP/2 Stream 发送给 gRPC 服务端。
- 服务端接收到响应之后,使用 PB(Protobuf)做反序列化。
- 回调 GrpcFuture 的 set(Response) 方法,唤醒阻塞的客户端调用线程,获取 RPC 响应数据。
GRPC使用
服务设计
工程结构
|- grpc-demo
|-- grpc-client
|-- grpc-lib
|-- grpc-server
- grpc-demo : 父级工程, 管理依赖相关。
- grpc-client: 客户端工程,负责调用gRPC服务, 提供HTTP服务触发。
- grpc-server: 股票服务端工程, 提供股票价格接口。
- grpc-lib: 公用工程,生成为protobuf对象与gRPC Service。
3.3 Protoc生成协议文件
下载工具: protoc
下载插件: protoc-gen-grpc-java
进入proto文件目录, 执行以下命令, 生成gRPC对象与Service:
protoc --plugin=protoc-gen-grpc-java=d:/TestCode/protoc-grpc.exe --java_out=./ --grpc-java_out=./ StockService.proto
注意插件路径要写正确, 要指定protobuf与grpc两个输出位置, 命令指定在当前同级目录生成协议文件。
3.4 工程说明
-
grpc-demo
POM文件
<dependencies> <!-- spring boot grpc 相关依赖 --> <dependency> <groupId>net.devh</groupId> <artifactId>grpc-spring-boot-starter</artifactId> <version>2.4.0.RELEASE</version> </dependency> <dependency> <groupId>net.devh</groupId> <artifactId>grpc-server-spring-boot-starter</artifactId> <version>2.4.0.RELEASE</version> </dependency> </dependencies>
-
grpc-server服务端
GrpcStockService类, 重写gRPC Service定义的接口,生成指定范围随机数的价格 :@GrpcService public class GrpcStockService extends StockServiceGrpc.StockServiceImplBase { @Override public void getStockPrice(StockServiceRequest request, StreamObserver<StockServiceReply> responseObserver) { String msg = "股票名称:" + request.getName() + ", 股票价格:" + (new Random().nextInt(100-20)+20); StockServiceReply reply =StockServiceReply.newBuilder().setMessage(msg).build(); responseObserver.onNext(reply); responseObserver.onCompleted(); } }
GrpcServiceStartup启动类:
@SpringBootApplication @ComponentScan(basePackages = {"com.itcast"}) public class GrpcServerStartup { public static void main(String[] args) { SpringApplication.run(GrpcServerStartup.class, args); } }
-
grpc-client客户端
GrpcClientService类:
@Service public class GrpcClientService { @GrpcClient("grpc-server") private StockServiceGrpc.StockServiceBlockingStub stockServiceStub; public String getStockPrice(final String name) { try { final StockServiceReply response = stockServiceStub.getStockPrice(StockServiceRequest.newBuilder().setName(name).build()); return response.getMessage(); } catch (final StatusRuntimeException e) { return "error!"; } } }
注解GrpcClient映射的名称为grpc-server, 不能随便填写,要与配置保持一致。
GrpcClientApplication启动类:@SpringBootApplication @RestController @ComponentScan(basePackages = {"com.itcast"}) public class GrpcClientApplication { @Autowired private GrpcClientService grpcClientService; public static void main(String[] args) { SpringApplication.run(GrpcClientApplication.class, args); } @RequestMapping("/") public String getStockPrice(@RequestParam(defaultValue = "中国平安") String name) { return grpcClientService.getStockPrice(name); } }
application.yml配置文件:
server: port: 9000 spring: application: name: grpc-client grpc: client: grpc-server: address: 'static://127.0.0.1:9999' enableKeepAlive: true keepAliveWithoutCalls: true negotiationType: plaintext
这里面定义名为【grpc-server】的服务配置信息, 与上面注解要保持一致。
-
grpc-lib客户端
StockService.proto文件:
syntax = "proto3"; option java_multiple_files = true; option java_package = "com.itcast.grpc.lib"; option java_outer_classname = "StockServiceProto"; // The stock service definition. service StockService { // get stock price by name rpc GetStockPrice (StockServiceRequest) returns (StockServiceReply) { } } // The request message message StockServiceRequest { string name = 1; } // The response message message StockServiceReply { string message = 1; }
POM依赖
<dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.8.0</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty-shaded</artifactId> <version>1.21.0</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>1.21.0</version> <exclusions> <exclusion> <artifactId>protobuf-java</artifactId> <groupId>com.google.protobuf</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>1.21.0</version> </dependency> </dependencies>
注意Protobuf生成工具和组件要保持一致, 工具我们用的是3.8.0最新版, 依赖也要改成3.8.0,排除了grpc-protobuf的传递依赖。
启动验证
启动服务端与客户端, 访问客户端地址: http://127.0.0.1:9000?name=中国银行
不断刷新请求, 股票价格也会随机变化,能够正常结合Spring Boot访问gRPC服务。
SEATA FESCAR基本概念与理论
2019 年 1 月,阿里巴巴中间件团队发起了开源项目 Fescar(Fast & EaSy Commit And Rollback),和社区一起共建开源分布式事务解决方案。Fescar 的愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者们遇到的分布式事务方面的所有难题。
Fescar 开源后,蚂蚁金服加入 Fescar 社区参与共建,并在 Fescar 0.4.0 版本中贡献了 TCC 模式。为了打造更中立、更开放、生态更加丰富的分布式事务开源社区,经过社区核心成员的投票,大家决定对 Fescar 进行品牌升级,并更名为 Seata,意为:Simple Extensible Autonomous Transaction Architecture,是一套一站式分布式事务解决方案。
发展历程:
传统分布式事务解决方案
- 两阶段提交(2PC)
- 两阶段事务提交长时间锁定, 但也不能保证事务的百分百可靠,同时对性能较大影响,某个服务出现故障, 影响全局事务, 可用性差,不适合分布式微服务领域。
补偿事务(TCC)
主要分为Try、Confirm、Cancel三个阶段。
Try 主要做检测校验及预处理工作;
Confirm 是对业务做确认提交动作, 一般Try处理成功, Confirm也会成功。
Cancel是在某个业务环节执行错误的时候, 或者极端Confirm出错情况下, 执行的补偿方法。比如转账没有 成功到达对方账户, 那么Cancel就要把钱退回转帐方账户。
TCC侵入性较强, 需要写较多补偿方法, 加入补偿机制, 而且必须保障幂等,整体复杂, 且开发量大, 也 不易维护。
- 异步消息一致性
将分布式事务拆分成本地事务, 通过消息队列记录并通知各服务事务处理结果:
-
A 系统先发送一个 prepared 消息到 mq,如果这个 prepared 消息发送失败那么就直接取消操作别执行了;
-
如果这个消息发送成功过了,那么接着执行本地事务,如果成功就告诉 mq 发送确认消息,如果失败就告诉 mq 回滚消息;
-
如果发送了确认消息,那么此时 B 系统会接收到确认消息,然后执行本地的事务;
-
消息队列会自动定时轮询所有发送过 prepared 消息但未发送确认消息的服务,这个消息是不是本地事务处理失败了, 是继续重试还是回滚?服务可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个机制的作用就是避免可能本地事务执行成功了,而确认消息却发送失败了。
-
这个方案里,要是系统 B 的事务失败了咋办?自动不断重试直到成功,如果实在是不行,要么就是针对重要的资金类业务进行全局回滚,比如 B 系统本地回滚后,再通知系统 A 也回滚;或是发送报警由人工来回滚或补偿。
CAP理论
CAP的含义:
- C:Consistency 一致性
- A:Availability 可用性
- P:Partition tolerance 分区容错性
在分布式系统中,C、A、P三个条件中我们最多只能满足两个要求。
一般在分布式领域, 会通过牺牲一致性来换取系统的可用性和分区容错性。
BASE理论
所谓的“牺牲一致性”并不是完全放弃数据一致性,而是牺牲强一致性换取弱一致性,这样我们就有兼顾全局的可能。BASE理论:
- BA:Basic Available 基本可用
整个系统在某些不可抗力的情况下,仍然能够保证“可用性”,即一定时间内仍然能够返回一个明确的结果。只不过“基本可用”和“高可用”的区别是:
【一定时间】可以适当延长 ;
当举行大促时,【响应时间】可以适当延长;
给部分用户直接返回一个降级页面,从而缓解服务器压力。但要注意,返回降级页面仍然是返回明确结果。 - S:Soft State:柔性状态
同一数据的不同副本的状态,可以不需要实时一致。 - E:Eventual Consisstency:最终一致性
同一数据的不同副本的状态,可以不需要实时一致,但一定要保证经过一定时间后仍然是一致的。
SEATA处理机制
所有微服务以本地事务方式处理,作为分支事务, 各服务之间通过RCP通信, 所有分支事务由全局事务管控。
SEATA分布式事务的解决方案是由一个全局事务(Global Transaction), 和一批分支事务(Branch Transation)组成, 分支事务也就是各微服务的本地事务。
SEATA的三大组件:
-
Transaction Coordinator(TC):维护全局和分支事务的状态,驱动全局事务提交与回滚。
-
Transaction Manager™:定义全局事务的范围:开始、提交或回滚全局事务。
-
Resource Manager(RM):管理分支事务处理的资源,与 TC 通信以注册分支事务并报告分支事务的状态,并驱动分支事务提交或回滚。
SEATA分布式事务管理的生命周期过程:
-
TM 要求 TC 开始新的全局事务,TC 生成表示全局事务的 XID。
-
XID 通过微服务的调用链传播。
-
RM 在 TC 中将本地事务注册为 XID 的相应全局事务的分支。
-
TM 要求 TC 提交或回滚 XID 的相应全局事务。
-
TC 驱动 XID 的相应全局事务下的所有分支事务,完成分支提交或回滚。
更多机制实现细节及原理,比如AT模式、MT模式和XA模式的原理与设计, 请参考[SEATA WIKI](
SEATA FESCAR使用
服务设计
这里引用官方的与Spring Boot的结合示例,Seata-samples-springboot 集成到我们工程当中, 并做调整改进。
主要设计了两个接口, 一个是修改状态, 另外一个是修改数量,并抛出异常, 两个接口没有受内部事务控制, 集成在一个工程当中。
增加数量接口会模拟抛出异常, 我们需要验证, 修改状态接口的数据是否会产生回滚。
工程结构
- SeataDemoApplication启动类:
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@ComponentScan(basePackages = {"com.itcast"})
@EntityScan(basePackages = {"com.itcast"})
@EnableJpaRepositories(basePackages = {"com.itcast"})
@RestController
@EnableTransactionManagement
public class SeataDemoApplication {
final String ASSET_ID = "14070e0e3cfe403098fa9ca37e8e7e76";
@Autowired
private IAssignService assignService;
public static void main(String[] args) {
SpringApplication.run(SeataDemoApplication.class, args);
}
/**
* Home string.
*
* @return the string
*/
@RequestMapping(value = "/asset/assign")
@ResponseBody
public String assetAssign() {
String result;
try {
AssetAssign assetAssign = assignService.increaseAmount(
ASSET_ID);
result = assetAssign.toString();
} catch (Exception e) {
result = ExceptionUtils.getMessage(e);
}
return result;
}
}
注意要开启EnableJpaRepositories和ComponentScan包扫描。
这里定义了外部接口, 调用内部service方法。
- AssetServiceImpl类:
@Service
@Component
public class AssetServiceImpl implements IAssetService {
/**
* The constant LOGGER.
*/
public static final Logger LOGGER = LoggerFactory.getLogger(IAssetService.class);
/**
* The constant ASSET_ID.
*/
public static final String ASSET_ID = "DF001";
@Autowired
private AssetRepository assetRepository;
@Override
public int increase() {
LOGGER.info("Asset Service Begin ... xid: " + RootContext.getXID() + "\n");
Asset asset = assetRepository.findById(ASSET_ID).get();
asset.setAmount(asset.getAmount().add(new BigDecimal("1")));
assetRepository.save(asset);
throw new RuntimeException("test exception for seata, your transaction should be rollbacked,asset=" + asset);
}
}
这里定义了一个增加数量的接口, 内部会抛出一个自定义的RuntimeException, 这个方法没有加任何事务处理。
- AssignServiceImpl类:
@Service
public class AssignServiceImpl implements IAssignService {
private static final Logger LOGGER = LoggerFactory.getLogger(AssignServiceImpl.class);
@Autowired
private AssignRepository assignRepository;
@Autowired
private IAssetService assetService;
@Override
// @Transactional
@GlobalTransactional
public AssetAssign increaseAmount(String id) {
LOGGER.info("Assign Service Begin ... xid: " + RootContext.getXID() + "\n");
AssetAssign assetAssign = assignRepository.findById(id).get();
assetAssign.setStatus("2");
assignRepository.save(assetAssign);
// remote call asset service
assetService.increase();
return assetAssign;
}
}
这里实现两个动作, 一个是修改assetAssign的状态, 另一个是调用增加数量的接口。
注意官方示例里面@Transactional是开启的, 这个我们需要把它关闭。
GlobalTransactional是Seata提供的注解, 这个需要加上。
从这两个业务实现可以看到, 内部没有采用任务事务处理,如果没有纳入分布式事务, 即便抛出异常, 对数据库的操作仍会生效。
- SeataConfiguration配置类:
/**
* The type Fescar configuration.
*/
@Configuration
public class SeataConfiguration {
@Value("${spring.application.name}")
private String applicationId;
/**
* 注册一个StatViewServlet
*
* @return global transaction scanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
GlobalTransactionScanner globalTransactionScanner = new GlobalTransactionScanner(applicationId,
"my_test_tx_group");
return globalTransactionScanner;
}
}
里面指定了一个名称为my_test_tx_group组别, 这是一个标识, Seata可以支持多个组别, 多个配置存在。
-
工程配置
application.yml:server: port: 9999 servlet: context-path: /demo spring: application: name: seata-springboot-app datasource: driverClassName: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.19.150:3306/seata?useSSL=false&serverTimezone=UTC username: root password: 654321 poolPingConnectionsNotUsedFor: 60000 removeAbandoned: true connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 minIdle: 1 validationQuery: SELECT 1 FROM DUAL initialSize: 5 maxWait: 60000 poolPreparedStatements: false filters: stat,wall testOnBorrow: false testWhileIdle: true minEvictableIdleTimeMillis: 300000 timeBetweenEvictionRunsMillis: 60000 testOnReturn: false maxActive: 50 druid: user: admin password: admin jpa: hibernate: ddl-auto: none show-sql: true
file.conf(Seata配置)
transport { # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true #thread factory for netty thread-factory { boss-thread-prefix = "NettyBoss" worker-thread-prefix = "NettyServerNIOWorker" server-executor-thread-prefix = "NettyServerBizHandler" share-boss-worker = false client-selector-thread-prefix = "NettyClientSelector" client-selector-thread-size = 1 client-worker-thread-prefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT boss-thread-size = 1 #auto default pin or 8 worker-thread-size = 8 } } store { # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions max-branch-session-size = 16384 # globe session size , if exceeded throws exceptions max-global-session-size = 512 # file buffer size , if exceeded allocate new buffer file-write-buffer-cache-size = 16384 # when recover batch read size session.reload.read_size = 100 } service { #vgroup->rgroup vgroup_mapping.my_test_tx_group = "default" #only support single node default.grouplist = "127.0.0.1:8091" #degrade current not support enableDegrade = false #disable disable = false } client { async.commit.buffer.limit = 10000 lock { retry.internal = 10 retry.times = 30 } } ## transaction log store store { ## store mode: file、db mode = "file" ## file store file { dir = "file_store/data" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions max-branch-session-size = 16384 # globe session size , if exceeded throws exceptions max-global-session-size = 512 # file buffer size , if exceeded allocate new buffer file-write-buffer-cache-size = 16384 # when recover batch read size session.reload.read_size = 100 } ## database store db { driver_class = "" url = "" user = "" password = "" } }
registry.conf(Seata配置):
registry { # file 、nacos 、eureka、redis、zk type = "file" file { name = "file.conf" } } config { # file、nacos 、apollo、zk type = "file" file { name = "file.conf" } }
3.3 启动验证
启动服务之前,先要启动Seata-Server。 可直接下载运行包
seata-server 默认会连接ZK, 下载运行Zookeeper
确保数据库init_db.sql脚本执行成功, 如果seata采用数据库模式, 确保db_store.sql文件执行成功。
- 启动服务
- 访问地址: http://127.0.0.1:9999/demo/asset/assign
抛出一个异常, 控制台日志, 打印出回滚信息:
amout数量还是为1, 回滚成功。
查看Asset_assign表
status为04, 没有被修改成2, 回滚成功。