背景
在flink中,我们有时候会使用到RoaringBitmap进行统计计数等操作,而当使用RoaringBitmap时,这就涉及到了最重要的问题,如何序列化?序列化的目的是为了进行网络通信或者状态序列化的目的,本文的重点是比较kryo使用默认的序列化器序列化RoaringBitmap和自定义序列化器序列化RoaringBitmap的性能对比
性能对比
当在flink中使用RoaringBitmap时,flink自身携带的序列化器是没法处理这个类的序列化的,只能交给kryo进行序列化,而kryo都是使用FieldSerializer来对对象进行序列化,当kryo对RoaringBitmap类进行序列化时,他会对里面的
的每个字段分别调用对应的序列化器进行序列化/反序列化,但是其实这样的性能不高,因为其实不是这里面数组里面的每个元素都需要序列化,而是可以根据情况来决定的,所以RoaringBitmap本身提供了serializer/deserializer方法,这相比于直接序列化每个字段有极大的性能提升,所以我们这里需要实现自己的kryo序列化器来直接使用RoaringBitmap提供的serializer/deserializer方法,以下是得到的性能测试结果:
附测试代码:
@Benchmark @OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION_SER)
public void serializerKryoBitmap(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
ExecutionConfig executionConfig = env.getConfig();
executionConfig.enableForceKryo();
env.addSource(new BitMapWrapperSource(RECORDS_PER_INVOCATION_SER, 10)).rebalance()
.addSink(new SelfBitMapSink<BitMapWrapper>());
env.execute();
}
@Benchmark @OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION_SER)
public void serializerKryoBitmapSerializer(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
ExecutionConfig executionConfig = env.getConfig();
executionConfig.enableForceKryo();
executionConfig.registerTypeWithKryoSerializer(RoaringBitmap.class, BitMapSerializer.class);
env.addSource(new BitMapWrapperSource(RECORDS_PER_INVOCATION_SER, 10)).rebalance()
.addSink(new SelfBitMapSink<BitMapWrapper>());
env.execute();
}
/**
* RoaringBitmapSource
*/
public static class BitMapWrapperSource extends BaseSourceWithKeyRange<BitMapWrapper> {
private static final long serialVersionUID = 2941333602938145599L;
private transient BitMapWrapper template;
public BitMapWrapperSource(int numEvents, int numKeys) {
super(numEvents, numKeys);
}
@Override protected void init() {
super.init();
template = initNewBitMapWrapper(0);
}
@Override protected BitMapWrapper getElement(int keyId) {
template.setId(keyId);
return template;
}
private BitMapWrapper initNewBitMapWrapper(int keyId) {
BitMapWrapper template = new BitMapWrapper();
RoaringBitmap r32 = new RoaringBitmap();
for (int i = 0; i < BITMAP_INIT_NUM / 2; i++) {
r32.add(1000000 + i);
}
for (int i = BITMAP_INIT_NUM / 2; i < BITMAP_INIT_NUM; i++) {
r32.add(9000000 + i);
}
r32.runOptimize();
template.setR1(r32);
template.setId(keyId);
return template;
}
}
public class SelfBitMapSink<BitMapWrapper> implements SinkFunction<org.apache.flink.benchmark.selfbitmap.BitMapWrapper>, SupportsConcurrentExecutionAttempts {
private static final long serialVersionUID = 1L;
public SelfBitMapSink() {
}
public void invoke(org.apache.flink.benchmark.selfbitmap.BitMapWrapper value) throws Exception{
if (value.getId() > 10 || value.getId() < 0) {
throw new Exception("id is illegal" + value.getId());
}
if(value.getR1().getCardinality() != SerializationFrameworkMiniBenchmarks.BITMAP_INIT_NUM){
throw new Exception("bitmap error" + value.getR1().getCardinality());
}
}
}