GRPC - JAVA笔记
gRPC简介
由google开源的一个高性能的RPc框架,由google内部的Stubby框架演化而来。2015年正式开源。云原生时代的RPC标准,由Go语言开发
gRPC的核心设计思路
网络通信 ------> gRPC 自己封装了网络通信的部分,提供了多种语言的 网络通信的封装,解决异构服务的问题 (C Java[Netty] Go)
协议 ------> HTTP2 传输数据的时候 使用二进制的数据内容。支持双向流(双工) 支持连接的多路复用
序列化 ------> 数据传输的格式,主要有两种 基于文本(JSON) 基于二进制 java原生的序列化方式
谷歌开源的序列化方式:protubuf:Protocol Buffers 时间和空间效率是JSON的3-5倍
IDL语言
代理的创建 ----> 让调用者像调用本地方法那样,去调用远端的服务方法 称为 stub
gPRC和ThriftRPC的区别
共性:支持异构语言的RPC
区别:
网络通信 Thrift TCP 专属协议
GRPC HTTP2
性能角度 ThriftRPC 性能高于 gRPC
gRPC 大厂支持,在云原生时代与其他组件更容易集成,所以gRPC应用更广泛
gRPC的好处
- 高效的进行进程间通信 序列化和协议
- 支持多语言 原生支持 C Go Java (一等公民)实现,C语言版本上扩展 C++,C#,NodeJS,Python等(二等公民)
- 支持多平台运行
- GRPC采用protobuf
- 使用HTTP2协议(只有GRPC使用)
HTTP2.0 协议
1. 回顾HTTP1.x协议
HTTP1.0 协议 请求响应的模式 短链接(无状态)协议 为了解决状态问题,使用了HttpSession的技术解决 单工通信 传输数据文本结构
HTTP1.1 协议 请求响应的模式 有限长连接 CS连接会保持一段时间 升级到了WebSocket方式 双工 可以实现服务器向客户端推送
总结Http1.x协议 共性
1. 传输数据文本格式,可读性好但是效率差
2. 本质上Http1.x无法实现双工通信
3. 资源的请求,需要发送多次请求,建立多个连接才可以完成
获取页面时,需要多次请求获取js和css文件,采用异步的方式进行。此时服务器压力增大,可以使用动静分离的手段解决,使用CDN缓存,减轻主服务器的请求压力
2. Http2.0协议
1. HTTP2.0协议是一个二进制协议,效率高于Http1.0协议,但是可读性差
2. 可以实现双工通信
2. 一个请求,一个连接,可以请求多个数据 (多路复用)
3. Http2.0协议的三个概念(参看图)
1. 数据流 stream
2. 消息 message
3. 帧 frame
4. 其他相关概念 (新的技术总是会把之前协议种使用程序来处理的问题放到标准中去)
1. 数据流的优先级,可以通过位不同的stream设置权重,限制不同流的传输顺序
2. 流控,client发送的数据太快了,服务器端处理不过来,通知client暂停数据发送
Protocol Buffers [protubuf]
1. protobuf 是一种与编程语言无关 [IDL],与具体的平台无关。它定义的中间语言,可以方便的在client与server种进行RPC的数据传输
2. protobuf两种版本,2和3,主要使用的是3
3. protobuf需要本地安装protubuf的编译器,将protobuf的IDL语言转换成一种特定的语言
4. protobuf编译器的安装:官方在最想版本没有提供windows的安装版本,可以考虑自己编译或者是降低版本去安装 https://github.com/protocolbuffers/protobuf/releases/v3.19.6
1. 解压缩
2. 配置环境变量
3. cmd 输入 protoc --version 查看
5. idea 安装protobuf的插件,显示IDL语言提示
idea 2022和2023 原生支持protobuf插件
Protobuf 语法详解
-
文件格式
文件一定要定义在以.proto结尾的文件中 UserService.proto OrderService.proto
-
版本设定
syntax = "proto3"
-
注释
1. 单行注释 // 2. 多行注释 /* */
-
与java相关的语法
// 指定后续protobuf生成的java代码是一个源文件还是多个源文件 xx.java option java_multiple_files = false; // 指定protobuf生成的类放在那个包中 option java_package = "indi.yuluo"; // 指定protobuf生成的外部类的名字 (管理内部类【内部类才是真正开发使用的】) option java_outer_classname = "";
-
逻辑包【java开发用的少】
// protobuf对于文件内容的管理 package xxx;
-
导入
// 在其他文件中导入其他的protobuf文件中定义的内容 在OrderService.proto中使用UserService.proto中定义的内容 import "xxx/UserService.proto";
-
基本类型
官方文档:https://protobuf.dev/programming-guides/proto3
-
枚举
enum SEASON { SPRING = 0; SUMMER = 1; } 枚举的值必须从0开始
-
消息 Message
// 请求后缀为 Request 响应为Response message LoginRequest { string username = 1; // 消息中的字段编号 singular string password = 2; int32 age = 3; } 消息相关的细节问题 1. 编号 从1开始到2^29-1,不一定连续 注意:19000-19999 编号不能使用,是protobuf自己保留的编号 2. 在消息字符中可以加入两个关键字, 一个是 singular(这个字段的值只能是0或者1个,"" 或 "123456", 是默认关键字) 一个是repeated。起修饰字符的作用,字段返回值是多个,等价于 java中的List集合 message Result { string content = 1; repeated string stutas = 2; // 此字段的返回值是多个,等价于Java List } 3. 消息可以定义多个 message LoginRequest{ .... } message LoginREsponse { ... } 4. 消息可以嵌套 message SearchResponse { // 定义 message Result { string url = 1; string title = 2; } string xxx = 1; int32 yyy = 2; // 上面所定义的字段 Result ppp = 3; } 5. oneof关键字 [其中一个] 使用较少 message SimpleMessage { // 当我们使用test_oneof字段时,它得值只能是其中一个 oneof test_oneof { string name = 1; int32 age = 2; } }
-
服务定义
service HelloService { // HelloRequest是响应消息(message) rpc hello(HelloRequest) returns(HelloRespose) {} } 1. 里面可以定义多个服务方法 2. 在开发过程中,可以定义多个服务接口 3. gRPC服务有四种服务方
gRPC 开发实战
-
项目结构
1. xxx-api 模块 用于定义protobuf idl语言,并且通过命令创建具体的代码,后续client server引入使用 1. message 2. service 2. xxx-server 模块 1. 实现api模块中定义的服务接口 2. 发布gRPC服务 (创建服务端程序) 3. xxx-client模块 1. 创建服务端stub(代理) 2. 基于代理(stub)进行RPC调用
-
api 模块编程内容
1. .proto文件书写protobuf的IDL 2. protoc命令 把proto文件中的IDL转成成为编程语言 protoc --java_out=/xxx/xxx /xxx/xxx/xx // --java_out 代表生成对应的java代码文件 生成go代码就是 --go 3. 在实战过程中,我们会使用maven的插件进行protobuf文件的编译,并将其放在对应文件位置 https://github.com/grpc/grpc-java
<dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty-shaded</artifactId> <version>1.54.1</version> <scope>runtime</scope> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>1.54.1</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>1.54.1</version> </dependency> <dependency> <!-- necessary for Java 9+ --> <groupId>org.apache.tomcat</groupId> <artifactId>annotations-api</artifactId> <version>6.0.53</version> <scope>provided</scope> </dependency> <!-- 配置插件 --> <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.7.1</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <!--生成message信息--> <!--os.detected.classifier maven 自定义环境变量,用来获取相应的操作系统信息 --> <protocArtifact>com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <!--生成service 服务接口--> <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.54.1:exe:${os.detected.classifier}</pluginArtifact> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
此时执行maven插件的命令 complie生成消息文件,使用complie-custom来生成服务接口信息,之后手动将生成在target文件中的文件移动到main,java包中去
在生产角度来说,此操作过于繁杂,需要执行两个maven命令,之后手动移动文件夹。如果文件太多,操作麻烦
此时可以手动创建一个 goal 来执行两个命令,简化maven执行命令次数。
选择来自plugin goal的两个命令执行一个maven命令即可完成。
设置maven的输出内容来完成手动移动目录的操作
<!--设置输出文件目录--> <outputDirectory>${basedir}/src/main/java</outputDirectory> <!--追加生成,不覆盖删除之前的代码--> <clearOutputDirectory>false</clearOutputDirectory>
-
xxx-server 服务端模块的开发
-
实现业务接口 添加具体的功能 (MyBatis + MysSQL)
@Override public void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) { // 1. 接受 client 的参数 String name = request.getName(); // 2. 业务处理 System.out.println("name parameter:" + name); // 3. 封装相应 // 3.1 创建响应对象的构造者 HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder(); // 3.2 填充数据 builder.setResult("hello method invoke ok"); // 3.3 封装响应 HelloProto.HelloResponse helloResponse = builder.build(); responseObserver.onNext(helloResponse); responseObserver.onCompleted(); }
-
创建服务端 (Netty)
public class GRPCServer1 { public static void main(String[] args) throws InterruptedException, IOException { // 解决端口和服务发布的问题 // 绑定端口 ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000); // 发布服务 serverBuilder.addService(new HelloServiceImpl()); // 创建服务对象 Server server = serverBuilder.build(); server.start(); server.awaitTermination(); } }
-
启动服务,测试效果
为了更方便的查看日志信息,可以将slf4j引入项目中
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.7</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.4.6</version> </dependency>
-
-
xxx-client 模块
/client 通过代理对象完成远端对象的调用 public class GrpcClient1 { public static void main(String[] args) { // 1. 创建通信的管道 ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000) .usePlaintext().build(); // 2. 获得代理对象 stub HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel); // 3. 完成rpc调用 // 3.1 准备参数 HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder(); builder.setName("yuluo"); HelloProto.HelloRequest helloRequest = builder.build(); // 3.2 进行rpc功能调用 HelloProto.HelloResponse helloResponse = helloService.hello(helloRequest); // 获取响应内容 String result = helloResponse.getResult(); System.out.println("result: " + result); } }
-
开发注意事项
服务端 处理返回值时 responseObserver.onNext(helloResponse); // 通过这个方法,把响应的消息回传给 client responseObserver.onCompleted(); // 通知 client 整个服务结束。底层返回标记 // client 就会监听标记 grpc 自己做的
gRPC的四种通信方式
-
四种通信方式
1, 简单rpc、一元rpc(Unary RPC) 2. 服务端流式RPC (Server Streaming RPC) 3. 客户端流式RPC (Client Streaming RPC) 4. 双向流RPC (Bi-directinal Stream RPC)
-
简单RPC(一元RPC)
-
特点:
-
当client发起调用后,提交数据,并且等待 服务端响应
-
开发过程中,主要采用的是一元RPC的通信方式
-
语法
// 服务接口定义 service HelloService{ rpc hello(HelloRequest) returns (HelloResponse) {} // 练习 rpc hello1(HelloRequest1) returns (HelloResponse1) {} }
-
-
-
服务端流式RPC
一个请求对象,服务端可以返回多个结果对象
-
使用场景
client - server 股票 每时每刻都要发送 价值数据
-
语法
// 服务接口定义 service HelloService{ rpc hello(HelloRequest) returns (stream HelloResponse) {} // 服务端流式 RPC // 练习 rpc hello1(HelloRequest1) returns (HelloResponse1) {} // 一元RPC }
-
-
客户端流式RPC
指的是客户端发送多个请求对象,服务端只返回一个结果
-
应用场景
IOT 物联网【传感器】中应用较多 client -> server 传感器 服务端 例如车上的传感器每次都发送位置信息到服务端
-
语法
// 服务接口定义 service HelloService{ // 服务端流式RPC rpc c2ss(HelloRequest) returns (stream HelloResponse) {} // 客户端流式RPC rpc cs2s(stream HelloRequest) returns (HelloResponse) {} }
-
开发
1. api 编写 idl rpc 定义 2. server /** * 客户端流式RPC * @param responseObserver * @return StreamObserver */ @Override public StreamObserver<HelloProto.HelloRequest> cs2s(StreamObserver<HelloProto.HelloResponse> responseObserver) { // 监听客户端请求 return new StreamObserver<HelloProto.HelloRequest>() { @Override public void onNext(HelloProto.HelloRequest helloRequest) { System.out.println("接收到了客户端发送的一条消息:" + helloRequest.getName()); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { System.out.println("client 的所有消息都发送到了服务端……"); // 提供响应:响应的目的是当接受了全部client的提交的信息,并处理后,提供响应 HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder(); builder.setResult("this is result!"); HelloProto.HelloResponse helloResponse = builder.build(); responseObserver.onNext(helloResponse); responseObserver.onCompleted(); } }; } 3. client public static void main(String[] args) { ManagedChannel localhost = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build(); try { HelloServiceGrpc.HelloServiceStub helloService = HelloServiceGrpc.newStub(localhost); // 监控响应处理 StreamObserver<HelloProto.HelloRequest> helloRequestStreamObserver = helloService.cs2s(new StreamObserver<HelloProto.HelloResponse>() { @Override public void onNext(HelloProto.HelloResponse helloResponse) { // 监控响应 System.out.println("服务端响应数据:" + helloResponse.getResult()); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { System.out.println("服务端响应结束……"); } }); // 做请求处理 客户端发送数据到服务端 不定时发送 for (int i = 0; i < 10; i++) { HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder(); builder.setName("yuluo" + i); HelloProto.HelloRequest helloRequest = builder.build(); helloRequestStreamObserver.onNext(helloRequest); // 间隔一秒 Thread.sleep(1000); } // 当所有的消息全部发送给服务端时,告知服务端 helloRequestStreamObserver.onCompleted(); // 设置客户端等待 localhost.awaitTermination(12, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } finally { localhost.shutdown(); } }
-
-
双向流式RPC
客户端可以发送多个请求消息,服务可以发送多个响应消息
-
应用场景:
聊天室
-
编码
api service HelloService{ // 双向流式RPC rpc cs2ss(stream HelloRequest) returns (stream HelloResponse) {} } server /** * 双向流式RPC开发 * @param responseObserver * @return */ @Override public StreamObserver<HelloProto.HelloRequest> cs2ss(StreamObserver<HelloProto.HelloResponse> responseObserver) { // 处理客户端请求 return new StreamObserver<HelloProto.HelloRequest>() { @Override public void onNext(HelloProto.HelloRequest helloRequest) { System.out.println("服务端接受到客户端提交的信息:" + helloRequest.getName()); // 处理服务端响应 responseObserver.onNext(HelloProto.HelloResponse.newBuilder().setResult("response " + helloRequest.getName() + " result!").build()); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { System.out.println("接收完成所有的客户端消息"); responseObserver.onCompleted(); } }; } client public static void main(String[] args) { ManagedChannel localhost = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build(); try { HelloServiceGrpc.HelloServiceStub helloService = HelloServiceGrpc.newStub(localhost); StreamObserver<HelloProto.HelloRequest> helloRequestStreamObserver = helloService.cs2ss(new StreamObserver<HelloProto.HelloResponse>() { @Override public void onNext(HelloProto.HelloResponse helloResponse) { System.out.println("服务端响应的结果" + helloResponse.getResult()); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { System.out.println("服务端所有响应完成"); } }); for (int i = 0; i < 10; i++) { helloRequestStreamObserver.onNext(HelloProto.HelloRequest.newBuilder().setName("yuluo" + i).build()); } helloRequestStreamObserver.onCompleted(); localhost.awaitTermination(12, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } finally { localhost.shutdown(); } }
-
GRPC 代理方式
1. BlockingStub (常用)
阻塞,通信方式
2. Stub (常用)
异步,通过监听处理
3. FutureStub
同步,异步,类似 NettyFuture
1. FutureStub 只能应用在一元 RPC
2.
public class GrpcClient7 {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {
TestServiceGrpc.TestServiceFutureStub testServiceFutureStub = TestServiceGrpc.newFutureStub(managedChannel);
ListenableFuture<TestProto.TestResponse> responseListenableFuture = testServiceFutureStub.testYuluo(
TestProto.TestRequest.newBuilder()
.setName("yuluo").build());
/* 同步操作
TestProto.TestResponse testResponse = responseListenableFuture.get();
System.out.println(testResponse.getResult());*/
// 异步操作
/* 不能和在实战中使用
responseListenableFuture.addListener(() -> {
System.out.println("异步的RPC响应回来了……");
}, Executors.newCachedThreadPool());*/
// 实战使用代码
Futures.addCallback(responseListenableFuture, new FutureCallback<TestProto.TestResponse>() {
@Override
public void onSuccess(TestProto.TestResponse testResponse) {
System.out.println("result.getResult() = " + testResponse.getResult());
}
@Override
public void onFailure(Throwable throwable) {
}
}, Executors.newCachedThreadPool());
System.out.println("后续的操作……");
// 避免异步操作执行太快,睡一会在执行
managedChannel.awaitTermination(12, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdown();
}
}
}
gRPC与Spring Boot整合
版本使用,需要注意:目前不支持spring boot3.0.x版本,使用2.7.6正常
gRP和Spring Boot整合的思想
1. grpc-client
2. grpc-server
Spring Boot与GRPC整合的过程中对于服务端封装
-
搭建开发环境
1. 搭建Spring Boot的开发环境 2. 引入与GRPC相关的内容
-
开发环境
// 引入服务端依赖 <dependency> <groupId>net.devh</groupId> <artifactId>grpc-server-spring-boot-starter</artifactId> <version>2.14.0.RELEASE</version> </dependency> // 服务接口开发 @GrpcService public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase { @Override public void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) { String name = request.getName(); System.out.println("name is" + name); responseObserver.onNext(HelloProto.HelloResponse.newBuilder().setResult("this is result").build()); responseObserver.onCompleted(); } } # 核心配置内容 grpc的端口号 spring: application: # 在项目中必须显式声明应用名,没有默认值 name: boot-server # 不启动tomcat服务器,使用的是grpc的服务端 # main: # web-application-type: none grpc: server: port: 9000
// 引入客户端依赖 <dependency> <groupId>net.devh</groupId> <artifactId>grpc-client-spring-boot-starter</artifactId> <version>2.14.0.RELEASE</version> </dependency> // client开发 @RestController public class TestController { @GrpcClient("grpc-server") private HelloServiceGrpc.HelloServiceBlockingStub stub; @GetMapping("/test") public String test1(String name) { System.out.println("name = " + name); HelloProto.HelloResponse helloResponse = stub.hello(HelloProto.HelloRequest.newBuilder().setName(name).build()); return helloResponse.getResult(); } } // 配置 server: port: 8081 spring: application: name: boot-client grpc: client: grpc-server: address: 'static://localhost:9000' negotiation-type: plaintext
gRPC 的高级应用
1. 拦截器 一元拦截器
2. Stream Tracer 【监听流】 流拦截器
3. Retry Policy 客户端重试
4. NameResolver 注册中心
5. 负载均衡
6. GRPC与微服务整合
- 序列化 protobuf与dubbo
- grpc 与 Gateway
- grpc 与 JWT
- grpc 与 Nacos2.0
- grpc 可以替换 OpenFeign