记录一次使用Flink 异步调用IO 总是超时的bug
注:博主使用的版本就是:<flink.version>1.16.1</flink.version>
起因:
因公司业务需要,使用Flink对数据进行流式处理,具体处理流程就是,从kafka接到数据,然后连续请求十多个接口(算法)对数据进行打标;
主程序:
具体的异步IO代码(随便找一个展示):
package com.wenge.datagroup.storage.process;
import com.alibaba.fastjson.JSONObject;
import com.wenge.datagroup.storage.bean.ParamConfig;
import com.wenge.datagroup.storage.common.ArgsConstants;
import com.wenge.datagroup.storage.process.base.BaseETL;
import com.wenge.datagroup.storage.service.YaYiService.YaYiPolarityService;
import com.wenge.datagroup.storage.utils.ConfigUtil;
import com.wenge.datagroup.storage.utils.Funnel;
import com.wenge.datagroup.storage.utils.YaYiUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@Slf4j
public class AnalyerAsyncIOProcessPolarity {
public static DataStream<JSONObject> process(DataStream<JSONObject> dataStream) {
log.error("----------------------------开始异步IO处理----------------");
String topic = Funnel.contains(ArgsConstants.TOPIC) ? Funnel.getString(ArgsConstants.TOPIC) : "";
String configFile = Funnel.contains(ArgsConstants.CONFIG) ? Funnel.getString(ArgsConstants.CONFIG) : "config.properties";
int asyncNum = Funnel.contains(ArgsConstants.ASYNC_NUM) ? Funnel.getInt(ArgsConstants.ASYNC_NUM) : ConfigUtil.getInteger(ArgsConstants.ASYNC_NUM);
int mapParallelism = Funnel.contains(ArgsConstants.MAP_PARALLELISM) ? Funnel.getInt(ArgsConstants.MAP_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.MAP_PARALLELISM);
int filterParallelism = Funnel.contains(ArgsConstants.FILTER_PARALLELISM) ? Funnel.getInt(ArgsConstants.FILTER_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.FILTER_PARALLELISM);
int TranslateParallelism = (Funnel.contains(ArgsConstants.Translate_MAP_PARALLELISM) ? Funnel.getInt(ArgsConstants.Translate_MAP_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.Translate_MAP_PARALLELISM));
// 异步IO
RichAsyncFunction richAsyncFunction = new RichAsyncFunction<JSONObject, JSONObject>() {
private transient ExecutorService executorService;
private ParamConfig paramConfig;
private YaYiUtil yaYiUtil;
@Override
public void open(Configuration parameters) {
// 重新加载配置文件
log.error("重新加载配置文件");
ConfigUtil.setConfigFile(configFile);
ConfigUtil.setTopic(topic);
ConfigUtil.init();
this.executorService = Executors.newFixedThreadPool(asyncNum);
paramConfig = new ParamConfig(ConfigUtil.getString("YaYiappKey"), ConfigUtil.getString("YaYiappSecret"));
yaYiUtil = new YaYiUtil(paramConfig);
}
@Override
public void close() throws Exception {
// 关闭线程池
if (executorService != null) {
executorService.shutdown();
}
log.error("----------------------------情感分析-线程池关闭----------------------");
}
@Override
public void timeout(JSONObject input, ResultFuture<JSONObject> resultFuture) {
JSONObject data = input;
String uuid = data.getString("uuid");
log.error("-----------------------数据超时----------------------:{}", uuid);
//对超时数据进行处理
resultFuture.complete(Collections.singleton(data));
}
@Override
public void asyncInvoke(JSONObject json, ResultFuture<JSONObject> resultFuture) {
CompletableFuture.supplyAsync(new Supplier<JSONObject>() {
@Override
public JSONObject get() {
String uuid = json.getString("uuid");
long start =System.currentTimeMillis();
try {
//TODO: 根据业务逻辑进行处理
String title = json.getString("title");
String content = json.getString("content");
String translate_title = json.getString("translate_title");
String translate_content = json.getString("translate_content");
String languageRecognition = json.getJSONObject("analysis").getString("language");
String dataSourceType = json.getJSONObject("platform").getString("data_source_type");
if (StringUtils.isNotBlank(translate_content)) {
String polarity = new String();
Integer polaritySum = 0;
//具体算法调用
YaYiPolarityService yaYiPolarityService = new YaYiPolarityService();
polarity = yaYiPolarityService.yaYiPolarity(translate_content);
if (StringUtils.isNotBlank(polarity)) {
polaritySum = StringUtils.equals(polarity, "A") ? 0 : StringUtils.equals(polarity, "B") ? 1 : 2;
JSONObject analysis = json.getJSONObject("analysis");
if (Objects.nonNull(analysis)) {
analysis.put("polarity", polaritySum);
json.put("analysis", analysis);
} else {
JSONObject analysisJson = new JSONObject();
analysisJson.put("polarity", polaritySum);
json.put("analysis", analysisJson);
}
}
log.error("uuid:{},分析后数据:{}", uuid, polarity);
long end =System.currentTimeMillis();
log.error("uuid:{},分析,耗时:{} ms", uuid,(end-start));
}
return json;
} catch (Exception e) {
log.error("--------分析异常:{},数据:{}",uuid, e);
return json;
}
}
}, executorService).thenAccept((JSONObject dbResult) -> {
resultFuture.complete(Collections.singleton(dbResult));
});
}
};
DataStream<JSONObject> downloadStream = AsyncDataStream.unorderedWait(
dataStream,
richAsyncFunction,
50000,
TimeUnit.MILLISECONDS,
asyncNum).name("qinggan").setParallelism(TranslateParallelism);
return downloadStream;
}
}
结果等到执行的时候总是出现大批量的调用IO算法超时:
分析:
因为开发,编译,以及运行时没有任何异常,所以一度考虑过网络问题,接口问题,还有Flink集群问题。
结果一一查过发现都不是,只能一点点debug代码,结果发现是这里的问题
每次代码运行到这里,就会卡住,然后直到过了我预定的超时时间,然后执行异步io超时方法
解决:
因为这个StringUtils 用的是:import org.apache.commons.lang.StringUtils 的方法,只需要换成import org.apache.commons.lang3.StringUtils 就可以了。
很无奈这个事实,因为时间太忙,没有时间继续深究,估计是老的StringUtils方法中使用到了加锁的方法。
注:博主使用的版本就是:<flink.version>1.16.1</flink.version>