大纲
- 新建工程
- 模拟函数
- 自定义无界流
- 背压测试
- 引入数据
- 低压侧
- 高压侧
- 测试结果
- 优化
- 降低算法复杂度
- 提高并行度
- 工程代码
背压(Backpressure)又称“反压”,是指在Flink的处理过程中,某个过程出于某种原因,消耗的上游数据过慢,导致上游数据积压。
本文我们将通过例子来探索背压产生的原因以及处理方法。
新建工程
我们新建一个名字叫Backpressure的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
模拟函数
我们借助《0基础学习区块链技术——入门》的思想,设计一个函数,用于计算出一个符合“前置N个0”的Hash。
private static String generateHash(String input) {
try {
MessageDigest md = MessageDigest.getInstance("SHA-256");
byte[] hashBytes = md.digest(input.getBytes());
StringBuilder sb = new StringBuilder();
for (byte b : hashBytes) {
sb.append(String.format("%02x", b));
}
while (sb.length() < 64) {
sb.insert(0, "0");
}
return sb.toString();
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("SHA-256 algorithm not found", e);
}
}
private static String generateHash(String input, int zeroCount) {
Long seed = 0L;
String value = input + " seed:" + seed;
String hash = generateHash(value);
String cmpStart = "0".repeat(zeroCount);
while (!hash.startsWith(cmpStart)) {
seed++;
value = input + " seed:" + seed;
hash = generateHash(value);
}
return hash;
}
后续我们通过调整zeroCount来调整计算过程的复杂性。
自定义无界流
通过这个无界数据流,我们可以持续给系统提供数据,进而方便我们测试。
这块代码见《Java版Flink使用指南——自定义无界流生成器》。
package org.example.generator;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> ctx) throws Exception {
long count = 0L;
while (isRunning) {
Thread.sleep(10); // Simulate delay
ctx.collect(count++); // Emit data
}
}
@Override
public void cancel() {
isRunning = false;
System.out.println("UnBoundedStreamGenerator canceled");
}
}
背压测试
引入数据
引入的数据就是上面创建的无界数据流,它会每10毫秒产生一条自增的Long型数据。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Long> customStreamSource = env.addSource(new UnBoundedStreamGenerator()).setParallelism(1).name("Custom Stream Source");
低压侧
我们在低压侧使用复杂度为1的算法,即要求算出的Hash值最前面一位是0即可。
int parallelism_low = 2;
int complexity_low = 1;
customStreamSource.keyBy(fields -> fields % parallelism_low).countWindow(10).apply(new WindowFunction<Long, String, Long, GlobalWindow>(){
@Override
public void apply(Long key, GlobalWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
String newLine = "";
for (Long element : input) {
String hash = generateHash(element.toString(), complexity_low);
newLine = newLine + " " + hash;
}
out.collect(newLine);
}
}).setParallelism(parallelism_low).name("complexity = " + complexity_low).print().name("print complexity = " + complexity_low);
高压侧
高压侧的复杂度是3,即要求算出的Hash值前3为是0。
int parallelism_high = 2;
int complexity_high = 3;
customStreamSource.keyBy(fields -> fields % parallelism_high).countWindow(10).apply(new WindowFunction<Long, String, Long, GlobalWindow>(){
@Override
public void apply(Long key, GlobalWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
String newLine = "";
for (Long element : input) {
String hash = generateHash(element.toString(), complexity_high);
newLine = newLine + " " + hash;
}
out.collect(newLine);
}
}).setParallelism(parallelism_high).name("complexity = " + complexity_high).print().name("print complexity = " + complexity_high);
测试结果
刚开始时,complexity = 3,即高压侧的过程是红色。
其处理速度不到低压侧(complexity = 1)的一半。目前缓冲区还没满,所以Source侧还没有背压。
这种情况持续下去,会导致Source过程产生背压,即数据拥堵。
过了一段时间,Source侧显示出已经处于“LOW”状态的低压了。
但是很快,它就会变成“HIGH”状态。
此时,可以看到Flink还可以继续运行,但是JVM的内存在持续增长。
优化
降低算法复杂度
由于complexity = 3时,算法复杂度比较高,才导致了背压。所以最简单的办法就是修改算法,降低其复杂度。
我们将算法复杂度调到1。
int parallelism_high = 2;
int complexity_high = 1;
customStreamSource.keyBy(fields -> fields % parallelism_high).countWindow(10).apply(new WindowFunction<Long, String, Long, GlobalWindow>(){
@Override
public void apply(Long key, GlobalWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
String newLine = "";
for (Long element : input) {
String hash = generateHash(element.toString(), complexity_high);
newLine = newLine + " " + hash;
}
out.collect(newLine);
}
}).setParallelism(parallelism_high).name("complexity = " + complexity_high).print().name("print complexity = " + complexity_high);
可以看到不会有背压产生。
提高并行度
如果算法就是很复杂,不能提升效率,那就要调动更多的CPU资源来计算。此时我们可以提升并行度来达成。
如下代码,我们将计算的并行度调整为9。
int parallelism_high = 9;
int complexity_high = 3;
customStreamSource.keyBy(fields -> fields % parallelism_high).countWindow(10).apply(new WindowFunction<Long, String, Long, GlobalWindow>(){
@Override
public void apply(Long key, GlobalWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
String newLine = "";
for (Long element : input) {
String hash = generateHash(element.toString(), complexity_high);
newLine = newLine + " " + hash;
}
out.collect(newLine);
}
}).setParallelism(parallelism_high).name("complexity = " + complexity_high).print().name("print complexity = " + complexity_high);
则背压也不会产生
工程代码
https://github.com/f304646673/FlinkDemo