在构建GRPC应用程序时,无论是客户端应用程序,还是服务端应用程序,在远程方法执行之前后之后,都可能需要执行一些通用逻辑。在GRPC中可以拦截RPC的执行,来满足特定的需求,如日志、认证、性能度量指标等,这会使用一种名为拦截器的扩展机制。GRPC提供了简单的API,用来在客户端和服务器端的GRPC应用程序中实现拦截器。根据所拦截的RPC调用的类型,GRPC拦截器可风味两类:客户端拦截器和服务端拦截。
注意:本篇文章的demo是基于Android中GRPC的使用-4种通信模式使用的demo上实现的。
客户端拦截器
当客户端发起RPC来触发GRPC服务的远程方法时,可以在客户端拦截这些RPC,也可以添加多个拦截器分别对不能的需求进行拦截。拦截器流程如下所示:
源码分析
以一元RPC为例,首先进入getOrderByUnary方法,在这个方法中,调用了getChannel().newCall,通过源码分析,getChannel的是ManagedChannelImpl的实例,在ManagedChannelImpl中,我们的过newCall可以找到Channel正真的实现类,代码如下:
realChannel = new RealChannel(nameResolver.getServiceAuthority());
Channel channel = realChannel;
if (builder.binlog != null) {
channel = builder.binlog.wrapChannel(channel);
}
this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
可以看到首先实例化一个 RealChannel对象,之后再通过ClientInterceptors.intercept方法构建拦截器链,intercept实现如下:
Preconditions.checkNotNull(channel, "channel");
for (ClientInterceptor interceptor : interceptors) {
channel = new InterceptorChannel(channel, interceptor);
}
return channel;
通过 InterceptorChannel将当前拦截器与下一个Channel关联起来,而InterceptorChannel中的newCall方法调用了interceptor中的interceptCall,具体代码如下:
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
return interceptor.interceptCall(method, callOptions, channel);
}
之后通过这种链式调用 ,构建一条拦截的器请求链。我们在getOrderByUnary发现,改方法还调用了asyncUnaryCall,通过asyncUnaryCall方法的调用,最终发现调用了asyncUnaryRequestCall,代码如下:
private static <ReqT, RespT> void asyncUnaryRequestCall(
ClientCall<ReqT, RespT> call,
ReqT req,
StartableListener<RespT> responseListener) {
startCall(call, responseListener);
try {
call.sendMessage(req);
call.halfClose();
} catch (RuntimeException e) {
throw cancelThrow(call, e);
} catch (Error e) {
throw cancelThrow(call, e);
}
}
可以发现,首先调用了 startCall,在startCall中最后调用的是ClientCall中的start方法,而在拦截器中,我们调用了下一个拦截器中的ClientCall中的start方法,代码如下:
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
// 服务端传递回来的header
Log.d(TAG," interceptCall : onHeaders = " + headers);
super.onHeaders(headers);
}
@Override
public void onClose(Status status, Metadata trailers) {
Log.d(TAG," interceptCall : onClose status = " + status ) ;
super.onClose(status, trailers);
}
@Override
public void onMessage(RespT message) {
Log.d(TAG," interceptCall : onMessage message = " + message);
super.onMessage(message);
}
@Override
public void onReady() {
Log.d(TAG," interceptCall : onReady");
super.onReady();
}
}, headers);
最后会调用RealChannel中的ClientCall,该类的ClientCall为ClientCallImpl,至此拦截器的监听链建立完成,具体代码中的类调用关系如下所示:
代码实现
添加两个拦截器的:MyClientInterceptor、MyClient2Interceptor,具体实现如下:
public class MyClientInterceptor implements ClientInterceptor {
private final String TAG = "MyClientInterceptor";
private static final Metadata.Key<String> TOKEN = Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER);
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// 客户端传递链路追中数据,将数据放到headers中
Log.d(TAG," interceptCall : currentThreadName = "+Thread.currentThread().getName()+"start = " + headers );
headers.put(TOKEN, "A2D05E5ED2414B1F8C6AEB19F40EF77C");
try{
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
// 服务端传递回来的header
Log.d(TAG," interceptCall : onHeaders = " + headers);
super.onHeaders(headers);
}
@Override
public void onClose(Status status, Metadata trailers) {
Log.d(TAG," interceptCall : onClose status = " + status);
super.onClose(status, trailers);
}
@Override
public void onMessage(RespT message) {
Log.d(TAG," interceptCall : onMessage message = " + message);
super.onMessage(message);
}
}, headers);
}catch (Exception e){
Log.d(TAG," interceptCall : err = " + e.getMessage());
}
}
@Override
public void sendMessage(ReqT message) {
Log.d(TAG," interceptCall : sendMessage currentThreadName = "+Thread.currentThread().getName()+ "message = " + message );
super.sendMessage(message);
}
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
Log.d(TAG," interceptCall : currentThreadName = "+Thread.currentThread().getName()+" cancel message = " + message + " " + cause.getMessage());
super.cancel(message, cause);
}
};
}
public class MyClient2Interceptor implements ClientInterceptor {
private final String TAG = "MyClient2Interceptor";
private static final Metadata.Key<String> TOKEN = Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER);
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// 客户端传递链路追中数据,将数据放到headers中
Log.d(TAG," interceptCall : currentThreadName = "+Thread.currentThread().getName()+"start = " + headers );
headers.put(TOKEN, "A2D05E5ED2414B1F8C6AEB19F40EF77C");
try{
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
// 服务端传递回来的header
Log.d(TAG," interceptCall : onHeaders = " + headers);
super.onHeaders(headers);
}
@Override
public void onClose(Status status, Metadata trailers) {
Log.d(TAG," interceptCall : onClose status = " + status);
super.onClose(status, trailers);
}
@Override
public void onMessage(RespT message) {
Log.d(TAG," interceptCall : onMessage message = " + message);
super.onMessage(message);
}
}, headers);
}catch (Exception e){
Log.d(TAG," interceptCall : err = " + e.getMessage());
}
}
@Override
public void sendMessage(ReqT message) {
Log.d(TAG," interceptCall : sendMessage currentThreadName = "+Thread.currentThread().getName()+ "message = " + message );
super.sendMessage(message);
}
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
Log.d(TAG," interceptCall : currentThreadName = "+Thread.currentThread().getName()+" cancel message = " + message + " " + cause.getMessage());
super.cancel(message, cause);
}
};
}
}
给ManagedChannelBuilder添加拦截器,代码如下:
.intercept(new MyClientInterceptor(),new MyClient2Interceptor())
运行结果
以一元RPC为例,运行结果如下:
2022-12-21 11:12:56.728 24644-24930/com.reactnative.grpcserverdemo D/MyClient2Interceptor: interceptCall : currentThreadName = pool-3-thread-1start = Metadata()
2022-12-21 11:12:56.729 24644-24930/com.reactnative.grpcserverdemo D/MyClientInterceptor: interceptCall : currentThreadName = pool-3-thread-1start = Metadata(token=A2D05E5ED2414B1F8C6AEB19F40EF77C)
2022-12-21 11:12:56.744 24644-24930/com.reactnative.grpcserverdemo D/MyClient2Interceptor: interceptCall : sendMessage currentThreadName = pool-3-thread-1message = # com.google.protobuf.StringValue@7c65f
value: "0"
2022-12-21 11:12:56.746 24644-24930/com.reactnative.grpcserverdemo D/MyClientInterceptor: interceptCall : sendMessage currentThreadName = pool-3-thread-1message = # com.google.protobuf.StringValue@7c65f
value: "0"
2022-12-21 11:12:57.251 24644-24935/com.reactnative.grpcserverdemo D/MyClientInterceptor: interceptCall : onHeaders = Metadata(content-type=application/grpc,grpc-encoding=identity,grpc-accept-encoding=gzip)
2022-12-21 11:12:57.251 24644-24935/com.reactnative.grpcserverdemo D/MyClient2Interceptor: interceptCall : onHeaders = Metadata(content-type=application/grpc,grpc-encoding=identity,grpc-accept-encoding=gzip)
2022-12-21 11:12:57.291 24644-24935/com.reactnative.grpcserverdemo D/MyClientInterceptor: interceptCall : onMessage message = # order.OrderManagementOuterClass$Order@2e3553
id: "0-1"
2022-12-21 11:12:57.293 24644-24935/com.reactnative.grpcserverdemo D/MyClient2Interceptor: interceptCall : onMessage message = # order.OrderManagementOuterClass$Order@2e3553
id: "0-1"
2022-12-21 11:12:57.296 24644-24935/com.reactnative.grpcserverdemo D/MyClientInterceptor: interceptCall : onClose status = Status{code=OK, description=null, cause=null}
2022-12-21 11:12:57.297 24644-24935/com.reactnative.grpcserverdemo D/MyClient2Interceptor: interceptCall : onClose status = Status{code=OK, description=null, cause=null}
服务端拦截器
当客户端调用GRPC服务的远程方法时,通过使用服务器端的拦截器,可以在执行远程方法之前,执行一个通用的逻辑。当需要在调用远程方法之前应用认证等特性时,会非常有帮助。也可以添加多个拦截器,分别对不同的功能进行认证。其拦截流程如下:
代码实现
添加两个拦截器的:MyServerInterceptor、MyServer2Interceptor,具体实现如下:
public class MyServerInterceptor implements ServerInterceptor {
private final String TAG = "MyServerInterceptor";
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Log.d(TAG," interceptCall : headers = " + headers);
ServerCall.Listener<ReqT> delegate = next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata headers) {
Log.d(TAG, " interceptCall response : sendHeaders = " + headers);
super.sendHeaders(headers);
}
@Override
public void sendMessage(RespT message) {
Log.d(TAG, " interceptCall response : sendMessage = " + headers);
super.sendMessage(message);
}
}, headers);
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegate) {
@Override
public void onMessage(ReqT message) {
super.onMessage(message);
Log.d(TAG," interceptCall request : onMessage message = " + message);
}
@Override
public void onReady() {
super.onReady();
Log.d(TAG," interceptCall request : onReady");
}
@Override
public void onCancel() {
super.onCancel();
Log.d(TAG," interceptCall request : onCancel");
}
@Override
public void onComplete() {
super.onComplete();
Log.d(TAG," interceptCall request : onComplete");
}
};
}
public class MyServer2Interceptor implements ServerInterceptor {
private final String TAG = "MyServer2Interceptor";
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Log.d(TAG," interceptCall : headers = " + headers);
ServerCall.Listener<ReqT> delegate = next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata headers) {
Log.d(TAG, " interceptCall response : sendHeaders = " + headers);
super.sendHeaders(headers);
}
@Override
public void sendMessage(RespT message) {
Log.d(TAG, " interceptCall response : sendMessage = " + headers);
super.sendMessage(message);
}
}, headers);
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegate) {
@Override
public void onMessage(ReqT message) {
super.onMessage(message);
Log.d(TAG," interceptCall request : onMessage message = " + message);
}
@Override
public void onReady() {
super.onReady();
Log.d(TAG," interceptCall request : onReady");
}
@Override
public void onCancel() {
super.onCancel();
Log.d(TAG," interceptCall request : onCancel");
}
@Override
public void onComplete() {
super.onComplete();
Log.d(TAG," interceptCall request : onComplete");
}
};
}
}
运行结果
以一元RPC为例,运行结果如下:
2022-12-29 15:20:54.257 3345-3804/com.reactnative.grpcserverdemo D/MyServer2Interceptor: interceptCall : headers = Metadata(user-agent=grpc-java-okhttp/1.42.0,content-type=application/grpc,token=A2D05E5ED2414B1F8C6AEB19F40EF77C,token=A2D05E5ED2414B1F8C6AEB19F40EF77C,grpc-accept-encoding=gzip)
2022-12-29 15:20:54.258 3345-3804/com.reactnative.grpcserverdemo D/MyServerInterceptor: interceptCall : headers = Metadata(user-agent=grpc-java-okhttp/1.42.0,content-type=application/grpc,token=A2D05E5ED2414B1F8C6AEB19F40EF77C,token=A2D05E5ED2414B1F8C6AEB19F40EF77C,grpc-accept-encoding=gzip)
2022-12-29 15:20:54.262 3345-3804/com.reactnative.grpcserverdemo D/MyServerInterceptor: interceptCall request : onReady
2022-12-29 15:20:54.262 3345-3804/com.reactnative.grpcserverdemo D/MyServer2Interceptor: interceptCall request : onReady
2022-12-29 15:20:54.277 3345-3804/com.reactnative.grpcserverdemo D/MyServerInterceptor: interceptCall request : onMessage message = # com.google.protobuf.StringValue@7c65f
value: "0"
2022-12-29 15:20:54.278 3345-3804/com.reactnative.grpcserverdemo D/MyServer2Interceptor: interceptCall request : onMessage message = # com.google.protobuf.StringValue@7c65f
value: "0"
2022-12-29 15:20:54.280 3345-3804/com.reactnative.grpcserverdemo D/MyServerInterceptor: interceptCall response : sendHeaders = Metadata()
2022-12-29 15:20:54.281 3345-3804/com.reactnative.grpcserverdemo D/MyServer2Interceptor: interceptCall response : sendHeaders = Metadata()
2022-12-29 15:20:54.303 3345-3804/com.reactnative.grpcserverdemo D/MyServerInterceptor: interceptCall response : sendMessage = Metadata(user-agent=grpc-java-okhttp/1.42.0,content-type=application/grpc,token=A2D05E5ED2414B1F8C6AEB19F40EF77C,token=A2D05E5ED2414B1F8C6AEB19F40EF77C,grpc-accept-encoding=gzip)
2022-12-29 15:20:54.304 3345-3804/com.reactnative.grpcserverdemo D/MyServer2Interceptor: interceptCall response : sendMessage = Metadata(user-agent=grpc-java-okhttp/1.42.0,content-type=application/grpc,token=A2D05E5ED2414B1F8C6AEB19F40EF77C,token=A2D05E5ED2414B1F8C6AEB19F40EF77C,grpc-accept-encoding=gzip)
2022-12-29 15:20:54.332 3345-3804/com.reactnative.grpcserverdemo D/MyServerInterceptor: interceptCall request : onComplete
2022-12-29 15:20:54.332 3345-3804/com.reactnative.grpcserverdemo D/MyServer2Interceptor: interceptCall request : onComplete