最近在使用jvm-sandbox-repeater做引流回放,针对dubbo的流量做相应的回放,结果发现一个问题dubbo接口录制的返回值与回放的返回值不一样。
问题排查
我们先看看返回的差异在什么地方
左侧是回放的流量返回结果, 右侧是录制的返回结果。 结果发现流量回访多了一个pages
, 少了一个optimizeJoinOfCountSql
, 我们这里先不考虑class 。
这里首先怀疑的对象是我们回放的时候mock的逻辑出了异常,可是经过调试以后这里主要是通过arthas watch了对应的dubbo方法的返回值,发现确实就是我们mock的结果值, 所以可以排除不是我们mock那块的问题,那为什么dubbo返回结果就有差异呢?
这里就要说到一个问题,我们dubbo服务的回放逻辑其实用的是dubbo的泛化调用的原理,它与正常的服务间的dubbo调用还是有一定的差异的。所以有问题 也就是这块了。那怎么确定确实是因为这个原因的,突然想起来,我们部分还专门做一个通过http转dubbo调用的服务,也是通过dubbo泛化来调用的,我只要验证下,这块是否有问题就知道是不是一样的了。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3vXzIMJ4-1681951293088)(https://s3-us-west-2.amazonaws.com/secure.notion-static.com/fd8db0cd-2f6c-4b87-9d26-1bd65ac05b0e/Untitled.png)]
结果确实是的,那问题就很明显了,确实是在dubbo泛化这块出了问题,那具体是什么问题呢,我们问问gpt看看。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WGzLVb08-1681951293089)(https://s3-us-west-2.amazonaws.com/secure.notion-static.com/692b9408-5dbd-4d00-a26a-5730e1c3086c/Untitled.png)]
这么看来,问题出在序列化的嫌疑是最大的了。
不够 这个内容讲的不太详细, 找到了一篇文章 **Dubbo源码分析 - 泛化调用
我们需要看这块的代码逻辑
@Activate(group = Constants.PROVIDER, order = -20000)
public class GenericFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
// 1 是否是泛化调用: 方法名:$invoke & 参数个数:3 & 调用接口非 GenericService
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null && inv.getArguments().length == 3
&& !invoker.getInterface().equals(GenericService.class)) {
// 调用的服务方法名
String name = ((String) inv.getArguments()[0]).trim();
// 调用的服务方法参数类型
String[] types = (String[]) inv.getArguments()[1];
// 嗲用的服务方法参数列表
Object[] args = (Object[]) inv.getArguments()[2];
try {
// 2. 反射获得提供方的方法对象,注意这里的 invoker 是服务端的,因此 interface 是服务接口,而非GenericService
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
// 2.1 获取服务提供方的目标方法的参数类型
Class<?>[] params = method.getParameterTypes();
if (args == null) {
args = new Object[params.length];
}
// 3 获得 generic 配置项
String generic = inv.getAttachment(Constants.GENERIC_KEY);
if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
}
// 4 根据 generic 的配置项反序列化参数值
// 4.1 如果没有设置 generic 或者 generic = true,反序列化参数,Map->Pojo (在 java 中,pojo通常用map来表示)
if (StringUtils.isEmpty(generic) || ProtocolUtils.isDefaultGenericSerialization(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
// 4.2 generic = nativejava, 反序列化参数, byte[]-> Pojo
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (byte[].class == args[i].getClass()) {
try {
UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i]);
args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.deserialize(null, is).readObject();
} catch (Exception e) {
throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
}
} else {
throw new RpcException(
"Generic serialization [" +
Constants.GENERIC_SERIALIZATION_NATIVE_JAVA +
"] only support message type " +
byte[].class +
" and your message type is " +
args[i].getClass());
}
}
// 4.3 generic = bean ,反序列化参数,JavaBeanDescriptor -> Pojo
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (args[i] instanceof JavaBeanDescriptor) {
args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
} else {
throw new RpcException(
"Generic serialization [" +
Constants.GENERIC_SERIALIZATION_BEAN +
"] only support message type " +
JavaBeanDescriptor.class.getName() +
" and your message type is " +
args[i].getClass().getName());
}
}
}
// 5 方法参数转换完毕,进行方法调用。
// 注意此时创建了一个新的 RpcInvocation 对象。$invoke 泛化调用被转为具体的普通调用
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
// 如果调用结果有异常,并且非GenericException异常,则使用 GenericException 包装
if (result.hasException() && !(result.getException() instanceof GenericException)) {
return new RpcResult(new GenericException(result.getException()));
}
// generic=nativejava的情况下,序列化结果, 结果 -> btyp[]
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
try {
UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.serialize(null, os).writeObject(result.getValue());
return new RpcResult(os.toByteArray());
} catch (IOException e) {
throw new RpcException("Serialize result failed.", e);
}
// generic=bean 的情况下,序列化结果, 结果 -> JavaBeanDescriptor
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
return new RpcResult(JavaBeanSerializeUtil.serialize(result.getValue(), JavaBeanAccessor.METHOD));
// generic=true 的情况下,序列化结果,Pojo -> Map
} else {
return new RpcResult(PojoUtils.generalize(result.getValue()));
}
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
throw new RpcException(e.getMessage(), e);
}
}
// 普通调用(包括调用泛化实现)
return invoker.invoke(inv);
}
}
这段代码的意思如下: 以下来自GPT:
这是一段 Java 代码,实现了一个过滤器,用于处理 Dubbo RPC 框架中的泛化调用。泛化调用是指通过传递方法名、参数类型和参数值来调用服务方法,而不需要使用服务接口的实现类。
具体来说,该过滤器先判断是否是泛化调用,如果是,就进行反序列化参数值的处理,然后将泛化调用转换为具体的普通调用,即调用服务提供方的具体方法,最后将返回结果进行序列化处理。
该过滤器的实现具体步骤如下:
-
判断是否是泛化调用,如果是,则获取调用的服务方法名、参数类型和参数值。
-
通过反射获得服务提供方的方法对象,并获取该方法的参数类型。
-
获取泛化配置项。
-
根据泛化配置项进行反序列化参数值的处理,具体分为以下几种情况:
a. 如果没有设置泛化配置项或者泛化配置项为 true,则将参数值反序列化为 POJO(在 Java 中,POJO 通常用 Map 来表示)。
b. 如果泛化配置项为 nativejava,则将参数值反序列化为 byte[],再将其转换为 POJO。
c. 如果泛化配置项为 bean,则将参数值反序列化为 JavaBeanDescriptor 对象,再将其转换为 POJO。
-
进行方法调用,注意此时创建了一个新的 RpcInvocation 对象,将泛化调用转换为具体的普通调用。
-
如果调用结果有异常,并且非 GenericException 异常,则使用 GenericException 包装。
-
根据泛化配置项进行序列化处理,具体分为以下几种情况:
a. 如果泛化配置项为 true,则将返回结果序列化为 Map。
b. 如果泛化配置项为 nativejava,则将返回结果序列化为 byte[]。
c. 如果泛化配置项为 bean,则将返回结果序列化为 JavaBeanDescriptor 对象。
所以这里我们重点需要看的时候将结果序列化为map的逻辑是如何的就好了,也就是7.1
所以我们要关注的是 PojoUtils
的 generalize
方法
Map<String, Object> map = new HashMap<String, Object>();
history.put(pojo, map);
map.put("class", pojo.getClass().getName());
for (Method method : pojo.getClass().getMethods()) {
if (ReflectUtils.isBeanPropertyReadMethod(method)) {
try {
map.put(ReflectUtils.getPropertyNameFromBeanReadMethod(method),
generalize(method.invoke(pojo), history));
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
// public field
for(Field field : pojo.getClass().getFields()) {
if (ReflectUtils.isPublicInstanceField(field)) {
try {
Object fieldValue = field.get(pojo);
// public filed同时也有get/set方法,如果get/set存取的不是前面那个 public field 该如何处理
if (history.containsKey(pojo)) {
Object pojoGenerilizedValue = history.get(pojo);
if (pojoGenerilizedValue instanceof Map
&& ((Map)pojoGenerilizedValue).containsKey(field.getName())) {
continue;
}
}
if (fieldValue != null) {
map.put(field.getName(), generalize(fieldValue, history));
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
return map;
这块的代码就解释的很清楚了。
map其实就是最后返回给我的对象数据, 而它里面的内容是通过反射类中的getter以及is的方法拿到对象的,当然还有包括后面的public的成员变量了。
所以归根到底我们还是需要去看看,这个dubbo方法返回的类型是什么样的才行了。我们可以看下
public class Page<T> implements IPage<T> {
private static final long serialVersionUID = 8545996863226528798L;
protected List<T> records;
protected long total;
protected long size;
protected long current;
protected List<OrderItem> orders;
protected boolean optimizeCountSql;
protected boolean searchCount;
protected boolean optimizeJoinOfCountSql;
protected String countId;
protected Long maxLimit;
public Page() {
this.records = Collections.emptyList();
this.total = 0L;
this.size = 10L;
this.current = 1L;
this.orders = new ArrayList();
this.optimizeCountSql = true;
this.searchCount = true;
this.optimizeJoinOfCountSql = true;
}
public Page(long current, long size) {
this(current, size, 0L);
}
public Page(long current, long size, long total) {
this(current, size, total, true);
}
public Page(long current, long size, boolean searchCount) {
this(current, size, 0L, searchCount);
}
public Page(long current, long size, long total, boolean searchCount) {
this.records = Collections.emptyList();
this.total = 0L;
this.size = 10L;
this.current = 1L;
this.orders = new ArrayList();
this.optimizeCountSql = true;
this.searchCount = true;
this.optimizeJoinOfCountSql = true;
if (current > 1L) {
this.current = current;
}
this.size = size;
this.total = total;
this.searchCount = searchCount;
}
public boolean hasPrevious() {
return this.current > 1L;
}
public boolean hasNext() {
return this.current < this.getPages();
}
public List<T> getRecords() {
return this.records;
}
public Page<T> setRecords(List<T> records) {
this.records = records;
return this;
}
public long getTotal() {
return this.total;
}
public Page<T> setTotal(long total) {
this.total = total;
return this;
}
public long getSize() {
return this.size;
}
public Page<T> setSize(long size) {
this.size = size;
return this;
}
public long getCurrent() {
return this.current;
}
public Page<T> setCurrent(long current) {
this.current = current;
return this;
}
public String countId() {
return this.countId;
}
public Long maxLimit() {
return this.maxLimit;
}
private String[] mapOrderToArray(Predicate<OrderItem> filter) {
List<String> columns = new ArrayList(this.orders.size());
this.orders.forEach((i) -> {
if (filter.test(i)) {
columns.add(i.getColumn());
}
});
return (String[])columns.toArray(new String[0]);
}
private void removeOrder(Predicate<OrderItem> filter) {
for(int i = this.orders.size() - 1; i >= 0; --i) {
if (filter.test(this.orders.get(i))) {
this.orders.remove(i);
}
}
}
public Page<T> addOrder(OrderItem... items) {
this.orders.addAll(Arrays.asList(items));
return this;
}
public Page<T> addOrder(List<OrderItem> items) {
this.orders.addAll(items);
return this;
}
public List<OrderItem> orders() {
return this.orders;
}
public boolean optimizeCountSql() {
return this.optimizeCountSql;
}
public static <T> Page<T> of(long current, long size, long total, boolean searchCount) {
return new Page(current, size, total, searchCount);
}
public boolean optimizeJoinOfCountSql() {
return this.optimizeJoinOfCountSql;
}
public Page<T> setSearchCount(boolean searchCount) {
this.searchCount = searchCount;
return this;
}
public Page<T> setOptimizeCountSql(boolean optimizeCountSql) {
this.optimizeCountSql = optimizeCountSql;
return this;
}
public long getPages() {
return super.getPages();
}
public static <T> Page<T> of(long current, long size) {
return of(current, size, 0L);
}
public static <T> Page<T> of(long current, long size, long total) {
return of(current, size, total, true);
}
public static <T> Page<T> of(long current, long size, boolean searchCount) {
return of(current, size, 0L, searchCount);
}
public boolean searchCount() {
return this.total < 0L ? false : this.searchCount;
}
/** @deprecated */
@Deprecated
public String getCountId() {
return this.countId;
}
/** @deprecated */
@Deprecated
public Long getMaxLimit() {
return this.maxLimit;
}
/** @deprecated */
@Deprecated
public List<OrderItem> getOrders() {
return this.orders;
}
/** @deprecated */
@Deprecated
public boolean isOptimizeCountSql() {
return this.optimizeCountSql;
}
/** @deprecated */
@Deprecated
public boolean isSearchCount() {
return this.searchCount;
}
public void setOrders(final List<OrderItem> orders) {
this.orders = orders;
}
public void setOptimizeJoinOfCountSql(final boolean optimizeJoinOfCountSql) {
this.optimizeJoinOfCountSql = optimizeJoinOfCountSql;
}
public void setCountId(final String countId) {
this.countId = countId;
}
public void setMaxLimit(final Long maxLimit) {
this.maxLimit = maxLimit;
}
}
关键是我们要关注一个点 少了的 optimizeJoinOfCountSql
有没有get或者is的方法,嗯 确实是没有的,所以这个就是为什么少了这个参数的原因了,那为什么会多出来一个pages
呢,我们可以看下 Page
这个类 实际上还实现了IPage
这个接口 我们看下
public interface IPage<T> extends Serializable {
List<OrderItem> orders();
default boolean optimizeCountSql() {
return true;
}
default boolean optimizeJoinOfCountSql() {
return true;
}
default boolean searchCount() {
return true;
}
default long offset() {
long current = this.getCurrent();
return current <= 1L ? 0L : Math.max((current - 1L) * this.getSize(), 0L);
}
default Long maxLimit() {
return null;
}
default long getPages() {
if (this.getSize() == 0L) {
return 0L;
} else {
long pages = this.getTotal() / this.getSize();
if (this.getTotal() % this.getSize() != 0L) {
++pages;
}
return pages;
}
}
default IPage<T> setPages(long pages) {
return this;
}
List<T> getRecords();
IPage<T> setRecords(List<T> records);
long getTotal();
IPage<T> setTotal(long total);
long getSize();
IPage<T> setSize(long size);
long getCurrent();
IPage<T> setCurrent(long current);
default <R> IPage<R> convert(Function<? super T, ? extends R> mapper) {
List<R> collect = (List)this.getRecords().stream().map(mapper).collect(Collectors.toList());
return this.setRecords(collect);
}
default String countId() {
return null;
}
}
这里我们就可以发现了,存在一个 getPages
这就是为什么我们会多出pages这个属性的原因了。
总结
综上,dubbo的回放结果根据序列化的情况就可能出现不一样的情况,我们可能需要根据情况进行接口层级的字段忽略对比才行。