开始前必读:基于grpc从零开始搭建一个准生产分布式应用(0) - quickStart
一、源码目录结构
二、GRPC拦截器源码
2.1、com.zd.baseframework.core.core.common.interceptor
package com.zd.baseframework.core.core.common.interceptor;
import com.zd.baseframework.core.core.common.interceptor.delegate.DelegateInterceptor;
import net.devh.boot.grpc.server.interceptor.GrpcGlobalServerInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
/**
* @Title: com.zd.baseframework.core.core.common.interceptor.GlobalServerInterceptorConfiguration
* @Description grpc服务端拦截器链设置文件,可以通过指定方法的org.springframework.core.annotation.Order注解来指定执行顺序,
* 不过好像不起作用,需要按位置来写
* @author liudong
* @date 2022/1/13 4:40 PM
*/
@Order(Ordered.LOWEST_PRECEDENCE)
@Configuration(proxyBeanMethods = false)
public class GlobalServerInterceptorConfiguration {
@GrpcGlobalServerInterceptor
@Order(value = 10000)
DelegateInterceptor delegateInterceptor(){
return new DelegateInterceptor();
}
}
package com.zd.baseframework.core.core.common.interceptor.delegate;
import io.grpc.Context;
import io.grpc.Metadata;
/**
* 日志常量
* Created by liudong on 2017/5/26.
*/
public class CONST {
/**空格,用于拼接字符串*/
public final static String SPLIT_BLANK = " ";
/**元数据中的trackid的key值*/
public final static Metadata.Key<String> TRACKID_METADATA_KEY = Metadata.Key.of("tid", Metadata.ASCII_STRING_MARSHALLER);
/**保存到当前线程的上下文中*/
public final static Context.Key<String> TRACK_INTIME_KEY = Context.key("universe_trackInTimeKey");
public final static Context.Key<String> TRACK_LOG_KEY = Context.key("universe_trackLogKey");
public final static Context.Key<String> TRACK_LOG_UID_KEY = Context.key("universe_trackLogIdKey");
}
package com.zd.baseframework.core.core.common.interceptor.delegate;
import io.grpc.ForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DelegateCall <ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {
private Metadata metadata;
public DelegateCall(ServerCall<ReqT, RespT> delegate) {
super(delegate);
}
@Override
public void sendMessage(RespT message) {
StringBuilder delegateLog = new StringBuilder(CONST.TRACK_LOG_KEY.get());
delegateLog.append(CONST.SPLIT_BLANK)
.append("exec=").append(System.currentTimeMillis() - Long.parseLong(CONST.TRACK_INTIME_KEY.get()));
log.info(delegateLog.toString());
super.sendMessage(message);
}
public Metadata getMetadata() {
return metadata;
}
public void setMetadata(Metadata metadata) {
this.metadata = metadata;
}
}
package com.zd.baseframework.core.core.common.interceptor.delegate;
import io.grpc.ForwardingServerCallListener;
import io.grpc.ServerCall;
public class DelegateCallListener< ReqT, RespT> extends ForwardingServerCallListener<ReqT> {
private ServerCall<ReqT, RespT> serverCall;
private final ServerCall.Listener<ReqT> delegate;
public DelegateCallListener(ServerCall.Listener<ReqT> delegate) {
this.delegate = delegate;
}
@Override
protected ServerCall.Listener<ReqT> delegate() {
return delegate;
}
@Override
public void onMessage(ReqT message) {
//TODO 接收消息,处理一些SQL注入等
super.onMessage(message);
}
public ServerCall<ReqT, RespT> getServerCall() {
return serverCall;
}
public void setServerCall(ServerCall<ReqT, RespT> serverCall) {
this.serverCall = serverCall;
}
}
package com.zd.baseframework.core.core.common.interceptor.delegate;
import com.zd.baseframework.core.core.common.token.TokenParser;
import io.grpc.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
/**
* @Title: com.zd.baseframework.core.core.common.interceptor.delegate.DelegateInterceptor
* @Description 访问日志拦截器,此拦截器只打印日志并不做真正拦截,只输出原始参数。在
* 在DelegateInterceptor和DelegateCall中分别输出:请求日志,格式如下:
* tid=7537479305976007099 appid=Bearer ip=/127.0.0.1:64446 uri=universe.core.cases.ICaseService/GetCaseByCaseNum inTime=1642129705403
* tid=7537479305976007099 appid=Bearer ip=/127.0.0.1:64446 uri=universe.core.cases.ICaseService/GetCaseByCaseNum inTime=1642129705403 exec=290
*
* >tid:trackid,且于跟踪栈请求
* >appid:接入应用的id
* >ip:访问端的ip地址和端口号
* >uri:客户端此次访问的uri
* >param:请求的原始参数
* >inTime:接收到请求的timestamp
* >exec:此次请求的执行总时间
*
* @author liudong
* @date 2022/1/13 4:44 PM
*/
@Slf4j
public class DelegateInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
long inTime = System.currentTimeMillis();
String trackId = metadata.get(CONST.TRACKID_METADATA_KEY);
if (StringUtils.isEmpty(trackId)){
trackId = String.valueOf(genLogId(System.nanoTime()));
}
StringBuilder delegateLog = new StringBuilder();
delegateLog.append("tid=").append(trackId)
.append(CONST.SPLIT_BLANK).append("appid=").append(TokenParser.appId())
.append(CONST.SPLIT_BLANK).append("ip=").append(serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR))
.append(CONST.SPLIT_BLANK).append("uri=").append(serverCall.getMethodDescriptor().getFullMethodName())
.append(CONST.SPLIT_BLANK).append("inTime=").append(inTime)
.append(CONST.SPLIT_BLANK);
//保存请求时间和相关日志到请求线程中,供后面拦截器打印用
Context ctx = Context.current();
ctx = ctx.withValue(CONST.TRACK_INTIME_KEY, String.valueOf(inTime));
ctx = ctx.withValue(CONST.TRACK_LOG_KEY, delegateLog.toString());
ctx = ctx.withValue(CONST.TRACK_LOG_UID_KEY, trackId);
log.info(delegateLog.toString());
//下面设置的值必须为原始值,不能自定义的变量,保持参数的纯净
DelegateCall<ReqT, RespT> serverCallDelegate = new DelegateCall<>(serverCall);
DelegateCallListener<ReqT, RespT> delegateCallListener = new DelegateCallListener<>(serverCallHandler.startCall(serverCallDelegate, metadata));
delegateCallListener.setServerCall(serverCall);
return Contexts.interceptCall(ctx, serverCallDelegate, metadata, serverCallHandler);
}
private long genLogId(long param){
long nowTime = System.currentTimeMillis();
long logId = nowTime & 281474976710655L | (param >> 8 & 65535L) << 47;
return logId;
}
}
2.2、com.zd.baseframework.core.core.common.loggenerator
package com.zd.baseframework.core.core.common.loggenerator;
import cn.hutool.core.util.StrUtil;
import com.zd.baseframework.core.core.common.interceptor.delegate.CONST;
import java.io.PrintWriter;
import java.io.StringWriter;
/**
* 用于调用方logStr生成和埋点日志生成
* 日志格式:
* 埋点日志:tid=7537479305976007099 appid=Bearer ip=/127.0.0.1:64446 uri=universe.core.cases.ICaseService/GetCaseByCaseNum inTime=1642129705403 k=s act=xxx
* 异常日志:tid=7537479305976007099 appid=Bearer ip=/127.0.0.1:64446 uri=universe.core.cases.ICaseService/GetCaseByCaseNum inTime=1642129705403 ep=xxx
* Created by liudong on 16/5/25.
*/
public final class LogGenerator {
/*track日志获取*/
public static String trackLog() {
if(StrUtil.isEmpty(CONST.TRACK_LOG_KEY.get())){
return "";
}
return CONST.TRACK_LOG_KEY.get();
}
/*track日志获取*/
public static String trackUid() {
return CONST.TRACK_LOG_UID_KEY.get();
}
/**
* 生成统计日志串,用于日志埋点,一般需要和其它方法合并使用:
* 日志格式 k=s act=自定义埋点标识,可自定义
*
* @param act 埋点标识
*/
public static String logTracking(String act) {
StringBuilder sb = new StringBuilder();
sb.append(CONST.SPLIT_BLANK).append("k=s")
.append(CONST.SPLIT_BLANK).append("act=").append(act)
.append(CONST.SPLIT_BLANK);
return sb.toString();
}
/**
* 返回异常字符串,用于在control中使用:
* 日志格式 ep=ExceptionMsg,前后均带空格
*
* @param exception 异常实例
*/
public static String logException(Exception exception) {
StringBuilder sb = new StringBuilder();
sb.append(CONST.SPLIT_BLANK).append("ep=")
.append(exception2String(exception))
.append(CONST.SPLIT_BLANK);
return sb.toString();
}
private static String exception2String(Exception ex) {
String exceptionMessage = "";
if (ex != null) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
try {
ex.printStackTrace(pw);
exceptionMessage = sw.toString();
} finally {
try {
sw.close();
pw.close();
} catch (Exception e) {
}
}
}
return exceptionMessage;
}
}
package com.zd.baseframework.core.core.common.token;
/**
* @Title: com.zd.baseframework.core.core.common.token.TokenParser
* @Description token解析类,用于将来存RPC时扩展用;
* @author liudong
* @date 2022/1/13 5:08 PM
*/
public class TokenParser {
/*返回用户名信息*/
public static final String appId(){
return "baseFrameWorkApp";
}
}
三、HTTP拦截器源码
package com.zd.baseframework.core.controller.core.advice;
import com.zd.baseframework.common.entity.http.BaseResponse;
import com.zd.baseframework.common.exceptions.AppException;
import io.grpc.StatusRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
@Slf4j
@ControllerAdvice(value = {"com.zd.baseframework.core.controller.core"})
public class HttpExceptionAdvice {
@ResponseStatus(code = HttpStatus.OK)
@ExceptionHandler(value = {AppException.class, StatusRuntimeException.class})
@ResponseBody
public BaseResponse dealKnownException(Exception e) {
log.error("VMException: " + e.getMessage(), e);
if (e instanceof StatusRuntimeException) {
StatusRuntimeException vmException = (StatusRuntimeException) e;
if (vmException.getStatus() != null && vmException.getStatus().getCode() != null) {
return BaseResponse.error(vmException.getStatus().getCode().value(), e.getMessage());
} else {
return BaseResponse.error(e.getMessage());
}
}else if (e instanceof AppException) {
AppException vmException = (AppException) e;
if (vmException.getStatus() != null) {
return BaseResponse.error(vmException.getStatus(), e.getMessage());
} else {
return BaseResponse.error(e.getMessage());
}
} else {
return BaseResponse.error(e.getMessage());
}
}
}