GRPC的通信模式
GRPC四种基础通信模式:一元RPC、服务器端流RPC、客户端流RPC以及双向流RPC。下面分别四种模式进行介绍,并在Android环境中来实现服务和客户端。
一元RPC
一元RPC模式也被称为简单RPC模式。在该模式中,当客户端调用服务端的远程方法时,客户端发送请求值服务端并获得一个响应,与响应一起发送的还有状态细节以及trailer元数据。客户端与服务端流程如下:
代码实现
假设需要为基于GRPC的在线零售应用程序构建OrderManagement服务,并在该服务中定义getOrderByUnary方法。使用该方法,客户端可以通过订单ID检索已有的订单。客户端发送一个带有订单的ID的请求,服务器给出响应,响应中包含订单信息。因此,他遵循一元RPC模式。
协议定义
首先定义服务与消息类型,在main/proto下创建order_management.proto文件,内容如下:
syntax = "proto3";
import "google/protobuf/wrappers.proto";
package order;
service OrderManagement {
rpc getOrderByUnary(google.protobuf.StringValue) returns (Order);
}
message Order {
string id = 1;
}
客户端
创建OrderClient类作为客户端,具体代码代码实现如下:
private final String TAG = "OrderClient";
private ManagedChannel channel;
private OrderManagementGrpc.OrderManagementStub stub;
public OrderClient(String ip,int port){
channel = ManagedChannelBuilder
.forAddress(ip, port)
.usePlaintext()
.build();
stub = OrderManagementGrpc.newStub(channel); // 生成一个远端服务在client的存根,看名称应该是阻塞调用
}
public void getOrderByUnary(String id){
Log.d(TAG,"getOrderByUnary start id = " + id);
stub.getOrderByUnary(StringValue.newBuilder().setValue(id).build(), new StreamObserver<OrderManagementOuterClass.Order>() {
@Override
public void onNext(OrderManagementOuterClass.Order value) {
Log.d(TAG,"getOrderByUnary onNext : id " + value.getId());
}
@Override
public void onError(Throwable t) {
Log.d(TAG,"getOrderByUnary onError: " +t.getMessage());
}
@Override
public void onCompleted() {
Log.d(TAG,"getOrderByUnary onCompleted.");
}
});
}
}
服务端
创建OrderServer类作为服务端,具体代码如下:
public class OrderServer extends OrderManagementGrpc.OrderManagementImplBase {
private final String TAG = "OrderServer";
private Server server;
private int port;
public OrderServer(int port){
this.port = port;
}
public void start(){
Log.d(TAG," start");
try {
server = NettyServerBuilder.forPort(port)
.addService(this)
.build()
.start();
} catch (Exception e) {
e.printStackTrace();
Log.d(TAG," start exception = " + e.getMessage());
}
Log.d(TAG," port = " + port);
//等待服务关闭,这样是服务一直等待使用的状态了
try {
server.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void getOrderByUnary(StringValue request, StreamObserver<OrderManagementOuterClass.Order> responseObserver) {
Log.d(TAG," getOrderByUnary request = " + request.getValue());
OrderManagementOuterClass.Order order = OrderManagementOuterClass
.Order.newBuilder().setId(request.getValue() +"-1").build();
responseObserver.onNext(order);
responseObserver.onCompleted();
}
}
使用
首先创建服务类,并启动服务,之后创建客户端,并调用getOrderByUnary方法,具体实现如下:
public class MainServerActivity extends AppCompatActivity {
private final String TAG = "MainServerActivity";
private Scheduler.Worker workerServer;
private Scheduler.Worker workerClient;
int port = 3452;
private int id;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Scheduler scheduler =
Schedulers.from(Executors.newSingleThreadScheduledExecutor());
workerServer = scheduler.createWorker();
workerServer.schedule(() ->{
startServer();
});
scheduler = Schedulers.from(Executors.newSingleThreadScheduledExecutor());
workerClient = scheduler.createWorker();
client = new OrderClient("127.0.0.1",port);
findViewById(R.id.bt_get_order_unary).setOnClickListener(v -> {
workerClient.schedule(() ->{
getOrderByUnary(id++ +"");
});
});
}
OrderServer server;
public void startServer(){
server = new OrderServer(port);
server.start();
}
OrderClient client;
public void getOrderByUnary(String id){
client.getOrderByUnary(id);
}
}
执行结果如下所示:
2022-12-15 15:04:50.987 30372-30503/com.reactnative.grpcserverdemo D/OrderClient: getOrderByUnary start id = 0
2022-12-15 15:04:51.550 30372-30505/com.reactnative.grpcserverdemo D/OrderServer: getOrderByUnary request = 0
2022-12-15 15:04:51.594 30372-30505/com.reactnative.grpcserverdemo D/OrderClient: getOrderByUnary onNext : id 0-1
2022-12-15 15:04:51.598 30372-30505/com.reactnative.grpcserverdemo D/OrderClient: getOrderByUnary onCompleted.
服务器端流RPC
在一元RPC模式中,GRPC服务器端和GRPC客户端在通信时始终只有一个请求和一个响应。在服务器端流RPC模式中,服务器端在接收到客户端请求消息后,会发回一个响应的序列。这种多个响应所组成的列表页被称为“流”。在讲所有的服务端响应发送完成之后,服务器端会以trailer元数据的形式将其状态发送给客户端,从而标记流的结束。客户端与服务端流程如下:
协议定义
在之前定义好服务中增加getOrderByServerStream方法,具体如下:
rpc getOrderByServerStream(google.protobuf.StringValue) returns ( stream Order);
客户端
在客户OrderClient类中增加getOrderByServerStream方法,代码如下:
public void getOrderByServerStream(String id){
Log.d(TAG,"getOrderByServerStream start id = " + id);
stub.getOrderByServerStream(StringValue.newBuilder().setValue(id).build(), new StreamObserver<OrderManagementOuterClass.Order>() {
@Override
public void onNext(OrderManagementOuterClass.Order value) {
Log.d(TAG,"getOrderByServerStream onNext : id " + value.getId());
}
@Override
public void onError(Throwable t) {
Log.d(TAG,"getOrderByServerStream onError: " +t.getMessage());
}
@Override
public void onCompleted() {
Log.d(TAG,"getOrderByServerStream onCompleted.");
}
});
}
服务端
在服务端OrderServer类中增加getOrderByServerStream方法,具体实现如下:
public void getOrderByServerStream(StringValue request, StreamObserver<OrderManagementOuterClass.Order> responseObserver) {
Log.d(TAG," getOrderByServerStream request = " + request.getValue());
OrderManagementOuterClass.Order order = OrderManagementOuterClass
.Order.newBuilder().setId(request.getValue() +"-1").build();
responseObserver.onNext(order);
order = OrderManagementOuterClass
.Order.newBuilder().setId(request.getValue() +"-2").build();
responseObserver.onNext(order);
responseObserver.onCompleted();
}
使用
之后调用getOrderByServerStream,具体实现如下:
findViewById(R.id.bt_get_order_server_stream).setOnClickListener(v -> {
workerClient.schedule(() ->{
getOrderByServerStream(id++ +"");
});
});
public void getOrderByServerStream(String id){
client.getOrderByServerStream(id);
}
执行结果如下所示:
2022-12-15 15:21:05.168 30372-31756/com.reactnative.grpcserverdemo D/OrderClient: getOrderByServerStream start id = 0
2022-12-15 15:21:05.265 30372-31760/com.reactnative.grpcserverdemo D/OrderServer: getOrderByServerStream request = 0
2022-12-15 15:21:05.309 30372-31757/com.reactnative.grpcserverdemo D/OrderClient: getOrderByServerStream onNext : id 0-1
2022-12-15 15:21:05.313 30372-31757/com.reactnative.grpcserverdemo D/OrderClient: getOrderByServerStream onNext : id 0-2
2022-12-15 15:21:05.314 30372-31757/com.reactnative.grpcserverdemo D/OrderClient: getOrderByServerStream onCompleted.
可见,服务端可以进行多次响应。
客户端流RPC
在客户端流RPC模式中个,客户端会发送多个请求给服务器端,而不再是单个请求。服务器端则会发送一个响应给客户端。但是服务器端不一定要等到从客户端接收到所有消息才发送响应。基于这样额逻辑,我们可以在接收到流中一个消息或几条消息之后就发送响应,也可以在读取完流中所有消息之后再发送响应。客户端与服务端流程如下:
协议定义
在之前定义好服务中增加getOrderByClientStream方法,具体如下:
rpc getOrderByClientStream(stream google.protobuf.StringValue) returns (Order);
客户端
在客户OrderClient类中增加getOrderByClientStream方法,代码如下:
public void getOrderByClientStream(){
StreamObserver<StringValue> observer = stub.getOrderByClientStream(new StreamObserver<OrderManagementOuterClass.Order>() {
@Override
public void onNext(OrderManagementOuterClass.Order value) {
Log.d(TAG,"getOrderByClientStream onNext : id = " + value.getId());
}
@Override
public void onError(Throwable t) {
Log.d(TAG,"getOrderByServerStream onError: " +t.getMessage());
}
@Override
public void onCompleted() {
Log.d(TAG,"getOrderByClientStream onCompleted.");
}
});
observer.onNext(StringValue.newBuilder().setValue("1").build());
observer.onNext(StringValue.newBuilder().setValue("2").build());
observer.onCompleted();
}
服务端
在服务端OrderServer类中增加getOrderByClientStream方法,具体实现如下:
public StreamObserver<StringValue> getOrderByClientStream(StreamObserver<OrderManagementOuterClass.Order> responseObserver) {
return new StreamObserver<StringValue>() {
@Override
public void onNext(StringValue value) {
Log.d(TAG," getOrderByClientStream onNext request = " + value);
}
@Override
public void onError(Throwable t) {
Log.d(TAG," getOrderByClientStream onError = " + t.getMessage());
}
@Override
public void onCompleted() {
Log.d(TAG," getOrderByClientStream onCompleted");
responseObserver.onNext(OrderManagementOuterClass
.Order.newBuilder().setId("1").build());
responseObserver.onCompleted();
}
};
}
使用
之后调用getOrderByClientStream,具体实现如下:
findViewById(R.id.bt_get_order_client_stream).setOnClickListener(v -> {
workerClient.schedule(() ->{
getOrderByClientStream();
});
});
public void getOrderByClientStream(){
client.getOrderByClientStream();
}
执行结果如下所示:
2022-12-15 16:16:14.013 5112-5203/com.reactnative.grpcserverdemo D/OrderServer: getOrderByClientStream onNext request = # com.google.protobuf.StringValue@7c694
value: "1"
2022-12-15 16:16:14.024 5112-5203/com.reactnative.grpcserverdemo D/OrderServer: getOrderByClientStream onNext request = # com.google.protobuf.StringValue@7c6c9
value: "2"
2022-12-15 16:16:14.025 5112-5203/com.reactnative.grpcserverdemo D/OrderServer: getOrderByClientStream onCompleted
2022-12-15 16:16:14.063 5112-5203/com.reactnative.grpcserverdemo D/OrderClient: getOrderByClientStream onNext : id = 1
2022-12-15 16:16:14.066 5112-5203/com.reactnative.grpcserverdemo D/OrderClient: getOrderByClientStream onCompleted.
双向流RPC
在双向流RPC模式中,客户端已消息流的形式发送请求到服务端,服务器也以消息流的形式进行响应。调用必须有客户端发起,但在此之后,通信完全基于GRPC客户端和服务端的应用程序逻辑。客户端与服务端流程如下:
协议定义
在之前定义好服务中增加getOrderByTwoWayStream方法,具体如下:
rpc getOrderByTwoWayStream(stream google.protobuf.StringValue) returns (stream Order);
客户端
在客户OrderClient类中增加getOrderByTwoWayStream方法,代码如下:
public void getOrderByTwoWayStream(){
StreamObserver<StringValue> observer = stub.getOrderByTwoWayStream(new StreamObserver<OrderManagementOuterClass.Order>() {
@Override
public void onNext(OrderManagementOuterClass.Order value) {
Log.d(TAG,"getOrderByTwoWayStream onNext : id = " + value.getId());
}
@Override
public void onError(Throwable t) {
Log.d(TAG,"getOrderByTwoWayStream onError: " +t.getMessage());
}
@Override
public void onCompleted() {
Log.d(TAG,"getOrderByTwoWayStream onCompleted.");
}
});
observer.onNext(StringValue.newBuilder().setValue("1").build());
observer.onNext(StringValue.newBuilder().setValue("2").build());
observer.onCompleted();
}
服务端
在服务端OrderServer类中增加getOrderByTwoWayStream方法,具体实现如下:
public StreamObserver<StringValue> getOrderByTwoWayStream(StreamObserver<OrderManagementOuterClass.Order> responseObserver) {
return new StreamObserver<StringValue>() {
@Override
public void onNext(StringValue value) {
Log.d(TAG," getOrderByTwoWayStream onNext request = " + value);
responseObserver.onNext(OrderManagementOuterClass
.Order.newBuilder().setId(value.getValue() + "-1").build());
}
@Override
public void onError(Throwable t) {
Log.d(TAG," getOrderByTwoWayStream onError = " + t.getMessage());
}
@Override
public void onCompleted() {
Log.d(TAG," getOrderByTwoWayStream onCompleted");
responseObserver.onCompleted();
}
};
}
使用
之后调用getOrderByTwoWayStream,具体实现如下:
findViewById(R.id.bt_get_order_two_way_stream).setOnClickListener(v -> {
workerClient.schedule(() ->{
getOrderByTwoWayStream();
});
});
public void getOrderByTwoWayStream(){
client.getOrderByTwoWayStream();
}
执行结果如下所示:
2022-12-15 16:30:31.223 6092-6260/com.reactnative.grpcserverdemo D/OrderServer: getOrderByTwoWayStream onNext request = # com.google.protobuf.StringValue@7c694
value: "1"
2022-12-15 16:30:31.289 6092-6260/com.reactnative.grpcserverdemo D/OrderServer: getOrderByTwoWayStream onNext request = # com.google.protobuf.StringValue@7c6c9
value: "2"
2022-12-15 16:30:31.299 6092-6260/com.reactnative.grpcserverdemo D/OrderServer: getOrderByTwoWayStream onCompleted
2022-12-15 16:30:31.304 6092-6260/com.reactnative.grpcserverdemo D/OrderClient: getOrderByTwoWayStream onNext : id = 1-1
2022-12-15 16:30:31.324 6092-6260/com.reactnative.grpcserverdemo D/OrderClient: getOrderByTwoWayStream onNext : id = 2-1
2022-12-15 16:30:31.328 6092-6260/com.reactnative.grpcserverdemo D/OrderClient: getOrderByTwoWayStream onCompleted.